From 02c5f585fac952f776d645e2908eeeb544d9e735 Mon Sep 17 00:00:00 2001 From: Ming Meng Date: Sun, 18 Jun 2023 12:43:09 -0400 Subject: [PATCH 1/6] support oauth for pulsar scaler Signed-off-by: Ming Meng --- CHANGELOG.md | 2 + .../authentication/authentication_helpers.go | 19 ++- .../authentication/authentication_types.go | 7 ++ pkg/scalers/pulsar_scaler.go | 38 +++++- pkg/scalers/pulsar_scaler_test.go | 117 ++++++++++++++---- 5 files changed, 158 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ecfb8153acf..7c256a0f161 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269)) - **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277)) - **General**: Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234)) +- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700)) ### Improvements @@ -101,6 +102,7 @@ New deprecation(s): - **General**: Refactor several functions for Status & Conditions handling into pkg util functions ([#2906](https://github.com/kedacore/keda/pull/2906)) - **General**: Bump `kubernetes-sigs/controller-runtime` to v0.15.0 and code alignment ([#4582](https://github.com/kedacore/keda/pull/4582)) + ## v2.10.1 ### Fixes diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index a77bc1c7333..bb2fb883e7f 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -41,8 +41,10 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet if out.EnableBasicAuth { return nil, errors.New("both bearer and basic authentication can not be set") } - - out.BearerToken = authParams["bearerToken"] + if out.EnableOAuth { + return nil, errors.New("both bearer and OAuth can not be set") + } + out.BearerToken = strings.TrimSuffix(authParams["bearerToken"], "\n") out.EnableBearerAuth = true case BasicAuthType: if len(authParams["username"]) == 0 { @@ -51,6 +53,9 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet if out.EnableBearerAuth { return nil, errors.New("both bearer and basic authentication can not be set") } + if out.EnableOAuth { + return nil, errors.New("both bearer and OAuth can not be set") + } out.Username = authParams["username"] // password is optional. For convenience, many application implement basic auth with @@ -80,6 +85,16 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet } out.CustomAuthValue = authParams["customAuthValue"] out.EnableCustomAuth = true + case OAuthType: + if out.EnableBasicAuth { + return nil, errors.New("both oauth and basic authentication can not be set") + } + if out.EnableBearerAuth { + return nil, errors.New("both oauth and bearer authentication can not be set") + } + out.EnableOAuth = true + out.ClientID = authParams["clientID"] + out.ClientSecret = authParams["clientSecret"] default: return nil, fmt.Errorf("incorrect value for authMode is given: %s", t) } diff --git a/pkg/scalers/authentication/authentication_types.go b/pkg/scalers/authentication/authentication_types.go index d6484dce233..0e0a50c5d10 100644 --- a/pkg/scalers/authentication/authentication_types.go +++ b/pkg/scalers/authentication/authentication_types.go @@ -16,6 +16,8 @@ const ( BearerAuthType Type = "bearer" // CustomAuthType is a auth type using a custom header CustomAuthType Type = "custom" + // OAuthType is a auth type using a oAuth2 + OAuthType Type = "oauth" ) // TransportType is type of http transport @@ -42,6 +44,11 @@ type AuthMeta struct { Key string CA string + // oAuth2 + EnableOAuth bool + ClientID string + ClientSecret string + // custom auth header EnableCustomAuth bool CustomAuthHeader string diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index 498b36f6f33..a68b610a937 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/go-logr/logr" + "golang.org/x/oauth2/clientcredentials" v2 "k8s.io/api/autoscaling/v2" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/metrics/pkg/apis/external_metrics" @@ -32,6 +33,11 @@ type pulsarMetadata struct { msgBacklogThreshold int64 activationMsgBacklogThreshold int64 + oauthTokenURI string + grantType string + scopes []string + clientID string + pulsarAuth *authentication.AuthMeta statsURL string @@ -165,6 +171,16 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) { return meta, errors.New("no subscription given") } + if config.TriggerMetadata["oauthTokenURI"] != "" { + meta.oauthTokenURI = config.TriggerMetadata["oauthTokenURI"] + } + if config.TriggerMetadata["grantType"] != "" { + meta.grantType = config.TriggerMetadata["grantType"] + } + if config.TriggerMetadata["scope"] != "" { + meta.scopes = strings.Split(config.TriggerMetadata["scope"], " ") + } + meta.metricName = fmt.Sprintf("%s-%s-%s", "pulsar", meta.topic, meta.subscription) meta.activationMsgBacklogThreshold = 0 @@ -199,6 +215,16 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) { if err != nil { return meta, fmt.Errorf("error parsing %s: %w", msgBacklogMetricName, err) } + + if auth != nil && auth.EnableOAuth { + // use clientID from authenticationRef if provided + // otherwise from the metadata + if auth.ClientID != "" { + meta.clientID = auth.ClientID + } else { + meta.clientID = config.TriggerMetadata["clientID"] + } + } meta.pulsarAuth = auth meta.scalerIndex = config.ScalerIndex return meta, nil @@ -212,9 +238,19 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) { return nil, fmt.Errorf("error requesting stats from admin url: %w", err) } + client := s.client + if s.metadata.pulsarAuth.EnableOAuth { + config := clientcredentials.Config{ + ClientID: s.metadata.clientID, + ClientSecret: s.metadata.pulsarAuth.ClientSecret, + TokenURL: s.metadata.oauthTokenURI, + Scopes: s.metadata.scopes, + } + client = config.Client(context.Background()) + } addAuthHeaders(req, &s.metadata) - res, err := s.client.Do(req) + res, err := client.Do(req) if err != nil { return nil, fmt.Errorf("error requesting stats from admin url: %w", err) } diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index 549546ef795..d994bf5a784 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -17,6 +17,7 @@ type parsePulsarMetadataTestData struct { adminURL string topic string subscription string + oauthData map[string]string } type parsePulsarAuthParamsTestData struct { @@ -30,6 +31,9 @@ type parsePulsarAuthParamsTestData struct { bearerToken string username string password string + enableOAuth bool + clientID string + clientSecret string } type pulsarMetricIdentifier struct { @@ -39,9 +43,11 @@ type pulsarMetricIdentifier struct { // A complete valid authParams example for sasl, with username and passwd var validPulsarWithAuthParams = map[string]string{ - "cert": "certdata", - "key": "keydata", - "ca": "cadata", + "cert": "certdata", + "key": "keydata", + "ca": "cadata", + "clientID": "clientIDdata", + "clientSecret": "clientSecretdata", } // A complete valid authParams example for sasl, without username and passwd @@ -49,36 +55,46 @@ var validPulsarWithoutAuthParams = map[string]string{} var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{ // failure, no adminURL - {map[string]string{}, true, false, false, "", "", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic"}, true, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", ""}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub1"}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub2"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub2"}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub3"}, false, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub3"}, - {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, - {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, - {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "isPartitionedTopic": "true", "subscription": "sub1"}, false, false, true, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{}, true, false, false, "", "", "", nil}, + {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", "", nil}, + {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", "", nil}, + {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", "", nil}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic"}, true, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "", nil}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub1", nil}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub2"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub2", nil}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub3"}, false, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub3", nil}, + {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1", nil}, + {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1", nil}, + {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "isPartitionedTopic": "true", "subscription": "sub1"}, false, false, true, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1", nil}, // tls - {map[string]string{"adminURL": "https://localhost:8443", "tls": "enable", "cert": "certdata", "key": "keydata", "ca": "cadata", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{"adminURL": "https://localhost:8443", "tls": "enable", "cert": "certdata", "key": "keydata", "ca": "cadata", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1", nil}, + + // oauth + {map[string]string{"adminURL": "https://localhost:8443", "authModes": "oauth", "grantType": "client_credentials", "authTokenURI": "https://localhost/token", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1", map[string]string{"grantType": "client_credentials", "authTokenURI": "https://localhost/token"}}, + {map[string]string{"adminURL": "https://localhost:8443", "authModes": "oauth", "grantType": "client_credentials", "authTokenURI": "https://localhost/token", "scope": "sw:scope1", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1", map[string]string{"grantType": "client_credentials", "authTokenURI": "https://localhost/token", "scope": "sw:scope1"}}, + {map[string]string{"adminURL": "https://localhost:8443", "authModes": "oauth", "grantType": "client_credentials", "authTokenURI": "https://localhost/token", "scope": "sw:scope1 sw:scope2", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1", map[string]string{"grantType": "client_credentials", "authTokenURI": "https://localhost/token", "scope": "sw:scope1 sw:scope2"}}, } var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{ // Passes, mutual TLS, no other auth (legacy "tls: enable") - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", ""}, // Passes, mutual TLS, no other auth (uses new way to enable tls) - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", ""}, // Fails, mutual TLS (legacy "tls: enable") without cert - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", ""}, // Fails, mutual TLS, (uses new way to enable tls) without cert - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", ""}, // Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS. // The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", ""}, // Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", ""}, + // Passes, server side TLS with oauth. Note that EnableTLS is expected to be false because it is not mTLS. + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "id1", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "clientID": "id2"}, map[string]string{"ca": "cadata", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "id1", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "clientID": "id2"}, map[string]string{"ca": "cadata", "clientID": "", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "id2", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "clientID": "id1"}, map[string]string{}, false, false, "", "", "", "", "", "", false, "id1", ""}, } var pulsarMetricIdentifiers = []pulsarMetricIdentifier{ @@ -145,9 +161,44 @@ func TestParsePulsarMetadata(t *testing.T) { if meta.subscription != testData.subscription { t.Errorf("Expected subscription %s but got %s\n", testData.subscription, meta.subscription) } + + if testData.oauthData != nil { + if meta.oauthTokenURI != testData.oauthData["oauthTokenURI"] { + t.Errorf("Expected oauthTokenURI %s but got %s\n", testData.oauthData["oauthTokenURI"], meta.oauthTokenURI) + } + if meta.grantType != testData.oauthData["grantType"] { + t.Errorf("Expected grantType %s but got %s\n", testData.oauthData["grantType"], meta.grantType) + } + if testData.oauthData["scope"] != "" && !compareScope(meta.scopes, testData.oauthData["scope"]) { + t.Errorf("Expected scopes %s but got %s\n", testData.oauthData["scope"], meta.scopes) + } + if testData.oauthData["scope"] == "" && meta.scopes != nil { + t.Errorf("Expected scopes to be null but got %s\n", meta.scopes) + } + if meta.clientID != testData.oauthData["clientID"] { + t.Errorf("Expected clientID %s but got %s\n", testData.oauthData["clientID"], meta.clientID) + } + } } } +func compareScope(scopes []string, scopeStr string) bool { + scopeMap := make(map[string]bool) + + for _, scope := range scopes { + scopeMap[scope] = true + } + + scopeList := strings.Fields(scopeStr) + for _, scope := range scopeList { + if !scopeMap[scope] { + return false + } + } + + return true +} + func TestPulsarAuthParams(t *testing.T) { for _, testData := range parsePulsarMetadataTestAuthTLSDataset { meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}) @@ -189,7 +240,12 @@ func TestPulsarAuthParams(t *testing.T) { } if meta.pulsarAuth.EnableBasicAuth != (testData.username != "" || testData.password != "") { - t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken) + if testData.username != "" { + t.Errorf("Expected EnableBasicAuth to be true when username is %s\n", testData.username) + } + if testData.password != "" { + t.Errorf("Expected EnableBasicAuth to be true when password is %s\n", testData.password) + } } if meta.pulsarAuth.Username != testData.username { @@ -199,6 +255,23 @@ func TestPulsarAuthParams(t *testing.T) { if meta.pulsarAuth.Password != testData.password { t.Errorf("Expected password to be set to %s but got %s\n", testData.password, meta.pulsarAuth.Password) } + + if meta.pulsarAuth.EnableOAuth != (testData.clientID != "" || testData.clientSecret != "") { + if testData.clientID != "" { + t.Errorf("Expected EnableOAuth to be true when clientID is %s\n", testData.clientID) + } + if testData.clientSecret != "" { + t.Errorf("Expected EnableOAuth to be true when clientSecret is %s\n", testData.clientSecret) + } + } + + if meta.clientID != testData.clientID { + t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.clientID) + } + + if meta.pulsarAuth.ClientSecret != testData.clientSecret { + t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret) + } } } From c0615609ec6063b30024291fb235f87db9c2b8ea Mon Sep 17 00:00:00 2001 From: Ming Meng Date: Wed, 21 Jun 2023 17:17:41 -0400 Subject: [PATCH 2/6] updated per review Signed-off-by: Ming Meng --- .../authentication/authentication_helpers.go | 4 + .../authentication/authentication_types.go | 8 +- pkg/scalers/pulsar_scaler.go | 51 +++++---- pkg/scalers/pulsar_scaler_test.go | 103 ++++++++---------- 4 files changed, 83 insertions(+), 83 deletions(-) diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index bb2fb883e7f..207248f8e30 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -93,6 +93,10 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet return nil, errors.New("both oauth and bearer authentication can not be set") } out.EnableOAuth = true + out.OauthTokenURI = authParams["oauthTokenURI"] + if authParams["scope"] != "" { + out.Scopes = strings.Split(authParams["scope"], " ") + } out.ClientID = authParams["clientID"] out.ClientSecret = authParams["clientSecret"] default: diff --git a/pkg/scalers/authentication/authentication_types.go b/pkg/scalers/authentication/authentication_types.go index 0e0a50c5d10..d79eb0d40d0 100644 --- a/pkg/scalers/authentication/authentication_types.go +++ b/pkg/scalers/authentication/authentication_types.go @@ -45,9 +45,11 @@ type AuthMeta struct { CA string // oAuth2 - EnableOAuth bool - ClientID string - ClientSecret string + EnableOAuth bool + OauthTokenURI string + Scopes []string + ClientID string + ClientSecret string // custom auth header EnableCustomAuth bool diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index a68b610a937..7ecf1a7395d 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "reflect" "strconv" "strings" @@ -33,11 +34,6 @@ type pulsarMetadata struct { msgBacklogThreshold int64 activationMsgBacklogThreshold int64 - oauthTokenURI string - grantType string - scopes []string - clientID string - pulsarAuth *authentication.AuthMeta statsURL string @@ -171,16 +167,6 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) { return meta, errors.New("no subscription given") } - if config.TriggerMetadata["oauthTokenURI"] != "" { - meta.oauthTokenURI = config.TriggerMetadata["oauthTokenURI"] - } - if config.TriggerMetadata["grantType"] != "" { - meta.grantType = config.TriggerMetadata["grantType"] - } - if config.TriggerMetadata["scope"] != "" { - meta.scopes = strings.Split(config.TriggerMetadata["scope"], " ") - } - meta.metricName = fmt.Sprintf("%s-%s-%s", "pulsar", meta.topic, meta.subscription) meta.activationMsgBacklogThreshold = 0 @@ -217,12 +203,10 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) { } if auth != nil && auth.EnableOAuth { - // use clientID from authenticationRef if provided - // otherwise from the metadata - if auth.ClientID != "" { - meta.clientID = auth.ClientID - } else { - meta.clientID = config.TriggerMetadata["clientID"] + auth.OauthTokenURI = readOAuthConfig(auth, config.TriggerMetadata, "OauthTokenURI") + auth.ClientID = readOAuthConfig(auth, config.TriggerMetadata, "ClientID") + if auth.Scopes == nil && config.TriggerMetadata["scope"] != "" { + auth.Scopes = strings.Split(config.TriggerMetadata["scope"], " ") } } meta.pulsarAuth = auth @@ -230,6 +214,25 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) { return meta, nil } +// use values from authenticationRef if provided, otherwise try the metadata +func readOAuthConfig(auth *authentication.AuthMeta, TriggerMetadata map[string]string, key string) string { + authValue := reflect.ValueOf(auth).Elem() + value := authValue.FieldByName(key) + if value.IsValid() { + val := value.Interface().(string) + if val != "" { + return val + } + } + + jsonKey := strings.ToLower(string(key[0])) + key[1:] + if value, ok := TriggerMetadata[jsonKey]; ok { + return fmt.Sprintf("%v", value) + } + + return "" +} + func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) { stats := new(pulsarStats) @@ -241,10 +244,10 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) { client := s.client if s.metadata.pulsarAuth.EnableOAuth { config := clientcredentials.Config{ - ClientID: s.metadata.clientID, + ClientID: s.metadata.pulsarAuth.ClientID, ClientSecret: s.metadata.pulsarAuth.ClientSecret, - TokenURL: s.metadata.oauthTokenURI, - Scopes: s.metadata.scopes, + TokenURL: s.metadata.pulsarAuth.OauthTokenURI, + Scopes: s.metadata.pulsarAuth.Scopes, } client = config.Client(context.Background()) } diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index d994bf5a784..1c8866122f6 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -17,7 +17,6 @@ type parsePulsarMetadataTestData struct { adminURL string topic string subscription string - oauthData map[string]string } type parsePulsarAuthParamsTestData struct { @@ -32,6 +31,8 @@ type parsePulsarAuthParamsTestData struct { username string password string enableOAuth bool + oauthTokenURI string + scope string clientID string clientSecret string } @@ -43,11 +44,9 @@ type pulsarMetricIdentifier struct { // A complete valid authParams example for sasl, with username and passwd var validPulsarWithAuthParams = map[string]string{ - "cert": "certdata", - "key": "keydata", - "ca": "cadata", - "clientID": "clientIDdata", - "clientSecret": "clientSecretdata", + "cert": "certdata", + "key": "keydata", + "ca": "cadata", } // A complete valid authParams example for sasl, without username and passwd @@ -55,46 +54,47 @@ var validPulsarWithoutAuthParams = map[string]string{} var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{ // failure, no adminURL - {map[string]string{}, true, false, false, "", "", "", nil}, - {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", "", nil}, - {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", "", nil}, - {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", "", nil}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic"}, true, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "", nil}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub1", nil}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub2"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub2", nil}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub3"}, false, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub3", nil}, - {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1", nil}, - {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1", nil}, - {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "isPartitionedTopic": "true", "subscription": "sub1"}, false, false, true, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1", nil}, + {map[string]string{}, true, false, false, "", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80"}, true, false, false, "http://172.20.0.151:80", "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic"}, true, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub2"}, false, true, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub2"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub3"}, false, false, false, "http://172.20.0.151:80", "persistent://public/default/my-topic", "sub3"}, + {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, + {map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "isPartitionedTopic": "true", "subscription": "sub1"}, false, false, true, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"}, // tls - {map[string]string{"adminURL": "https://localhost:8443", "tls": "enable", "cert": "certdata", "key": "keydata", "ca": "cadata", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1", nil}, - - // oauth - {map[string]string{"adminURL": "https://localhost:8443", "authModes": "oauth", "grantType": "client_credentials", "authTokenURI": "https://localhost/token", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1", map[string]string{"grantType": "client_credentials", "authTokenURI": "https://localhost/token"}}, - {map[string]string{"adminURL": "https://localhost:8443", "authModes": "oauth", "grantType": "client_credentials", "authTokenURI": "https://localhost/token", "scope": "sw:scope1", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1", map[string]string{"grantType": "client_credentials", "authTokenURI": "https://localhost/token", "scope": "sw:scope1"}}, - {map[string]string{"adminURL": "https://localhost:8443", "authModes": "oauth", "grantType": "client_credentials", "authTokenURI": "https://localhost/token", "scope": "sw:scope1 sw:scope2", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1", map[string]string{"grantType": "client_credentials", "authTokenURI": "https://localhost/token", "scope": "sw:scope1 sw:scope2"}}, + {map[string]string{"adminURL": "https://localhost:8443", "tls": "enable", "cert": "certdata", "key": "keydata", "ca": "cadata", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1"}, } var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{ // Passes, mutual TLS, no other auth (legacy "tls: enable") - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, // Passes, mutual TLS, no other auth (uses new way to enable tls) - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, // Fails, mutual TLS (legacy "tls: enable") without cert - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, // Fails, mutual TLS, (uses new way to enable tls) without cert - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""}, // Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS. // The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", ""}, // Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", ""}, + // Passes, server side TLS with oauth. Note that EnableTLS is expected to be false because it is not mTLS. - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "id1", "secret123"}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "clientID": "id2"}, map[string]string{"ca": "cadata", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "id1", "secret123"}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "clientID": "id2"}, map[string]string{"ca": "cadata", "clientID": "", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "id2", "secret123"}, - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "clientID": "id1"}, map[string]string{}, false, false, "", "", "", "", "", "", false, "id1", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123"}, + // Passes, oauth config data is set from metadata only + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", ""}, + // Passes, oauth config data is set from TriggerAuth if both provided + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"}, + // Passes, with multiple scopes from metadata + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": "sc:scope2 sc:scope1", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""}, + // Passes, with multiple scopes from TriggerAuth + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": "sc:scope2 sc:scope1", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"}, } var pulsarMetricIdentifiers = []pulsarMetricIdentifier{ @@ -161,41 +161,20 @@ func TestParsePulsarMetadata(t *testing.T) { if meta.subscription != testData.subscription { t.Errorf("Expected subscription %s but got %s\n", testData.subscription, meta.subscription) } - - if testData.oauthData != nil { - if meta.oauthTokenURI != testData.oauthData["oauthTokenURI"] { - t.Errorf("Expected oauthTokenURI %s but got %s\n", testData.oauthData["oauthTokenURI"], meta.oauthTokenURI) - } - if meta.grantType != testData.oauthData["grantType"] { - t.Errorf("Expected grantType %s but got %s\n", testData.oauthData["grantType"], meta.grantType) - } - if testData.oauthData["scope"] != "" && !compareScope(meta.scopes, testData.oauthData["scope"]) { - t.Errorf("Expected scopes %s but got %s\n", testData.oauthData["scope"], meta.scopes) - } - if testData.oauthData["scope"] == "" && meta.scopes != nil { - t.Errorf("Expected scopes to be null but got %s\n", meta.scopes) - } - if meta.clientID != testData.oauthData["clientID"] { - t.Errorf("Expected clientID %s but got %s\n", testData.oauthData["clientID"], meta.clientID) - } - } } } func compareScope(scopes []string, scopeStr string) bool { scopeMap := make(map[string]bool) - for _, scope := range scopes { scopeMap[scope] = true } - scopeList := strings.Fields(scopeStr) for _, scope := range scopeList { if !scopeMap[scope] { return false } } - return true } @@ -265,13 +244,25 @@ func TestPulsarAuthParams(t *testing.T) { } } - if meta.clientID != testData.clientID { - t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.clientID) + if meta.pulsarAuth.OauthTokenURI != testData.oauthTokenURI { + t.Errorf("Expected oauthTokenURI to be set to %s but got %s\n", testData.oauthTokenURI, meta.pulsarAuth.OauthTokenURI) + } + + if testData.scope != "" && !compareScope(meta.pulsarAuth.Scopes, testData.scope) { + t.Errorf("Expected scopes %s but got %s\n", testData.scope, meta.pulsarAuth.Scopes) + } + if testData.scope == "" && meta.pulsarAuth.Scopes != nil { + t.Errorf("Expected scopes to be null but got %s\n", meta.pulsarAuth.Scopes) + } + + if meta.pulsarAuth.ClientID != testData.clientID { + t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.pulsarAuth.ClientID) } if meta.pulsarAuth.ClientSecret != testData.clientSecret { t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret) } + } } From 92127a9c6e077847e616ab31f0d7d867a43389d0 Mon Sep 17 00:00:00 2001 From: Ming Meng Date: Thu, 22 Jun 2023 14:54:04 -0400 Subject: [PATCH 3/6] updated per review Signed-off-by: Ming Meng --- pkg/scalers/authentication/authentication_helpers.go | 5 +++-- pkg/scalers/pulsar_scaler.go | 5 +++-- pkg/scalers/pulsar_scaler_test.go | 6 ++++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index 207248f8e30..1707be081e4 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -94,8 +94,9 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet } out.EnableOAuth = true out.OauthTokenURI = authParams["oauthTokenURI"] - if authParams["scope"] != "" { - out.Scopes = strings.Split(authParams["scope"], " ") + scope := strings.ReplaceAll(authParams["scope"], " ", "") + if scope != "" { + out.Scopes = strings.Split(scope, ",") } out.ClientID = authParams["clientID"] out.ClientSecret = authParams["clientSecret"] diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index f08f4d0c83e..ec7b243b63d 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -205,8 +205,9 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) { if auth.OauthTokenURI == "" { auth.OauthTokenURI = config.TriggerMetadata["oauthTokenURI"] } - if auth.Scopes == nil && config.TriggerMetadata["scope"] != "" { - auth.Scopes = strings.Split(config.TriggerMetadata["scope"], " ") + scope := strings.ReplaceAll(config.TriggerMetadata["scope"], " ", "") + if auth.Scopes == nil && scope != "" { + auth.Scopes = strings.Split(scope, ",") } if auth.ClientID == "" { auth.ClientID = config.TriggerMetadata["clientID"] diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index 11f8943aa62..213fad4b0c0 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -92,9 +92,11 @@ var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{ // Passes, oauth config data is set from TriggerAuth if both provided {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"}, // Passes, with multiple scopes from metadata - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": "sc:scope2 sc:scope1", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, sc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""}, // Passes, with multiple scopes from TriggerAuth - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": "sc:scope2 sc:scope1", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, sc:scope1 ", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"}, + // Passes, invalid scopes provided + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " ", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"}, } var pulsarMetricIdentifiers = []pulsarMetricIdentifier{ From 5211f81447709ccb364b98b8e480a732e87866ef Mon Sep 17 00:00:00 2001 From: Ming Meng <101287520+mingmcb@users.noreply.github.com> Date: Thu, 22 Jun 2023 15:40:49 -0400 Subject: [PATCH 4/6] Update CHANGELOG.md Signed-off-by: Ming Meng <101287520+mingmcb@users.noreply.github.com> --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 136416949a6..50eb5777c18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -142,7 +142,6 @@ New deprecation(s): - **General**: Trying to prevent operator crash when accessing `ScaledObject.Status.ScaleTargetGVKR` ([#4389](https://github.com/kedacore/keda/issues/4389)) - **General**: Use default metrics provider from `sigs.k8s.io/custom-metrics-apiserver` ([#4473](https://github.com/kedacore/keda/pull/4473)) - ## v2.10.1 ### Fixes From 1f809b95809bb51e943f81aaccb8440d55aa054d Mon Sep 17 00:00:00 2001 From: Ming Meng Date: Thu, 6 Jul 2023 14:17:18 -0500 Subject: [PATCH 5/6] updated per review Signed-off-by: Ming Meng --- CHANGELOG.md | 2 +- .../authentication/authentication_helpers.go | 24 +++++++++++++++---- .../authentication/authentication_types.go | 2 +- pkg/scalers/pulsar_scaler.go | 5 ++-- pkg/scalers/pulsar_scaler_test.go | 8 ++++--- 5 files changed, 29 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fffde6a9186..08df0071668 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - [v1.0.0](#v100) ## Unreleased +- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700)) ### New @@ -140,7 +141,6 @@ None. - **RabbitMQ Scaler**: Add support for `unsafeSsl` in trigger metadata ([#4448](https://github.com/kedacore/keda/issues/4448)) - **RabbitMQ Scaler**: Add support for `workloadIdentityResource` and utilize AzureAD Workload Identity for HTTP authorization ([#4716](https://github.com/kedacore/keda/issues/4716)) - **Solace Scaler**: Add new `messageReceiveRateTarget` metric to Solace Scaler ([#4665](https://github.com/kedacore/keda/issues/4665)) -- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700)) ### Fixes diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index 1707be081e4..be892df159a 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -94,10 +94,7 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet } out.EnableOAuth = true out.OauthTokenURI = authParams["oauthTokenURI"] - scope := strings.ReplaceAll(authParams["scope"], " ", "") - if scope != "" { - out.Scopes = strings.Split(scope, ",") - } + out.Scopes = ParseScope(authParams["scope"]) out.ClientID = authParams["clientID"] out.ClientSecret = authParams["clientSecret"] default: @@ -112,6 +109,25 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet return out, err } +func ParseScope(inputStr string) []string { + scope := strings.TrimSpace(inputStr) + if scope != "" { + scopes := make([]string, 0) + list := strings.Split(scope, ",") + for _, sc := range list { + sc := strings.TrimSpace(sc) + if sc != "" { + scopes = append(scopes, sc) + } + } + if len(scopes) == 0 { + return nil + } + return scopes + } + return nil +} + func GetBearerToken(auth *AuthMeta) string { return fmt.Sprintf("Bearer %s", auth.BearerToken) } diff --git a/pkg/scalers/authentication/authentication_types.go b/pkg/scalers/authentication/authentication_types.go index 7761c5371a6..66dc12b677c 100644 --- a/pkg/scalers/authentication/authentication_types.go +++ b/pkg/scalers/authentication/authentication_types.go @@ -16,7 +16,7 @@ const ( BearerAuthType Type = "bearer" // CustomAuthType is an auth type using a custom header CustomAuthType Type = "custom" - // OAuthType is a auth type using a oAuth2 + // OAuthType is an auth type using a oAuth2 OAuthType Type = "oauth" ) diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index d9d92abda7b..b862b2cb7bd 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -217,9 +217,8 @@ func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetada if auth.OauthTokenURI == "" { auth.OauthTokenURI = config.TriggerMetadata["oauthTokenURI"] } - scope := strings.ReplaceAll(config.TriggerMetadata["scope"], " ", "") - if auth.Scopes == nil && scope != "" { - auth.Scopes = strings.Split(scope, ",") + if auth.Scopes == nil { + auth.Scopes = authentication.ParseScope(config.TriggerMetadata["scope"]) } if auth.ClientID == "" { auth.ClientID = config.TriggerMetadata["clientID"] diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index 45d5cfc0ee1..e614a546664 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -99,11 +99,13 @@ var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{ // Passes, oauth config data is set from TriggerAuth if both provided {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"}, // Passes, with multiple scopes from metadata - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, sc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""}, // Passes, with multiple scopes from TriggerAuth - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, sc:scope1 ", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"}, + // Passes, no scope provided + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"}, // Passes, invalid scopes provided - {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " ", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"}, + {map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"}, } var pulsarMetricIdentifiers = []pulsarMetricIdentifier{ From 0472c41714ddd41c62644c9ab953aebf7bbbf7ec Mon Sep 17 00:00:00 2001 From: Ming Meng Date: Fri, 15 Sep 2023 10:54:04 -0400 Subject: [PATCH 6/6] updated per review Signed-off-by: Ming Meng --- CHANGELOG.md | 4 +-- .../authentication/authentication_helpers.go | 2 ++ pkg/scalers/pulsar_scaler.go | 6 +++++ pkg/scalers/pulsar_scaler_test.go | 25 ++++++++++++++++++- 4 files changed, 34 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b8130f9d6f..e144e353289 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,7 +46,6 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - [v1.0.0](#v100) ## Unreleased -- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700)) ### New @@ -64,6 +63,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General**: Updated AWS SDK and updated all the aws scalers ([#4905](https://github.com/kedacore/keda/issues/4905)) - **Azure Pod Identity**: Introduce validation to prevent usage of empty identity ID for Azure identity providers ([#4528](https://github.com/kedacore/keda/issues/4528)) - **Prometheus Scaler**: Remove trailing whitespaces in customAuthHeader and customAuthValue ([#4960](https://github.com/kedacore/keda/issues/4960)) +- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700)) ### Fixes - **RabbitMQ Scaler**: Allow subpaths along with vhost in connection string ([#2634](https://github.com/kedacore/keda/issues/2634)) @@ -85,9 +85,9 @@ New deprecation(s): ### Other - **General**: Fixed a typo in the StatefulSet scaling resolver ([#4902](https://github.com/kedacore/keda/pull/4902)) +- **General**: In Metrics server show only logs with a severity level of ERROR or higher in the stderr ([#4049](https://github.com/kedacore/keda/issues/4049)) - **General**: Refactor ScaledJob related methods to be located at scale_handler ([#4781](https://github.com/kedacore/keda/issues/4781)) - **General**: Replace deprecated `set-output` command with environment file ([#4914](https://github.com/kedacore/keda/issues/4914)) -- **General**: In Metrics server show only logs with a severity level of ERROR or higher in the stderr ([#4049](https://github.com/kedacore/keda/issues/4049)) ## v2.11.2 diff --git a/pkg/scalers/authentication/authentication_helpers.go b/pkg/scalers/authentication/authentication_helpers.go index f3036dc7d56..e56485e659c 100644 --- a/pkg/scalers/authentication/authentication_helpers.go +++ b/pkg/scalers/authentication/authentication_helpers.go @@ -109,6 +109,8 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet return out, err } +// ParseScope parse OAuth scopes from a comma separated string +// whitespace is trimmed func ParseScope(inputStr string) []string { scope := strings.TrimSpace(inputStr) if scope != "" { diff --git a/pkg/scalers/pulsar_scaler.go b/pkg/scalers/pulsar_scaler.go index b862b2cb7bd..7e4442b84f6 100644 --- a/pkg/scalers/pulsar_scaler.go +++ b/pkg/scalers/pulsar_scaler.go @@ -9,6 +9,7 @@ import ( "net/http" "strconv" "strings" + "time" "github.com/go-logr/logr" "golang.org/x/oauth2/clientcredentials" @@ -223,6 +224,11 @@ func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetada if auth.ClientID == "" { auth.ClientID = config.TriggerMetadata["clientID"] } + // client_secret is not required for mtls OAuth(RFC8705) + // set secret to random string to work around the Go OAuth lib + if auth.ClientSecret == "" { + auth.ClientSecret = time.Now().String() + } } meta.pulsarAuth = auth meta.scalerIndex = config.ScalerIndex diff --git a/pkg/scalers/pulsar_scaler_test.go b/pkg/scalers/pulsar_scaler_test.go index e614a546664..8fc953e5332 100644 --- a/pkg/scalers/pulsar_scaler_test.go +++ b/pkg/scalers/pulsar_scaler_test.go @@ -267,6 +267,25 @@ func TestPulsarAuthParams(t *testing.T) { if meta.pulsarAuth.Password != testData.password { t.Errorf("Expected password to be set to %s but got %s\n", testData.password, meta.pulsarAuth.Password) } + } +} + +func TestPulsarOAuthParams(t *testing.T) { + for _, testData := range parsePulsarMetadataTestAuthTLSDataset { + logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, "test_pulsar_scaler") + meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, logger) + + if err != nil && !testData.isError { + t.Error("Expected success but got error", testData.authParams, err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + + if meta.pulsarAuth == nil { + t.Log("meta.pulsarAuth is nil, skipping rest of validation of", testData) + continue + } if meta.pulsarAuth.EnableOAuth != (testData.clientID != "" || testData.clientSecret != "") { if testData.clientID != "" { @@ -292,7 +311,11 @@ func TestPulsarAuthParams(t *testing.T) { t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.pulsarAuth.ClientID) } - if meta.pulsarAuth.ClientSecret != testData.clientSecret { + if meta.pulsarAuth.EnableOAuth && meta.pulsarAuth.ClientSecret == "" { + t.Errorf("Expected clientSecret not to be empty.\n") + } + + if testData.clientSecret != "" && strings.Compare(meta.pulsarAuth.ClientSecret, testData.clientSecret) != 0 { t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret) } }