Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support OAuth for pulsar scaler #4709

Merged
merged 12 commits into from
Sep 17, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- [v1.0.0](#v100)

## Unreleased
- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700))
zroubalik marked this conversation as resolved.
Show resolved Hide resolved

### New

Expand Down
40 changes: 38 additions & 2 deletions pkg/scalers/authentication/authentication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
if out.EnableBasicAuth {
return nil, errors.New("both bearer and basic authentication can not be set")
}

out.BearerToken = authParams["bearerToken"]
if out.EnableOAuth {
return nil, errors.New("both bearer and OAuth can not be set")
}
out.BearerToken = strings.TrimSuffix(authParams["bearerToken"], "\n")
out.EnableBearerAuth = true
case BasicAuthType:
if len(authParams["username"]) == 0 {
Expand All @@ -51,6 +53,9 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
if out.EnableBearerAuth {
return nil, errors.New("both bearer and basic authentication can not be set")
}
if out.EnableOAuth {
return nil, errors.New("both bearer and OAuth can not be set")
}

out.Username = authParams["username"]
// password is optional. For convenience, many application implement basic auth with
Expand Down Expand Up @@ -80,6 +85,18 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
}
out.CustomAuthValue = authParams["customAuthValue"]
out.EnableCustomAuth = true
case OAuthType:
if out.EnableBasicAuth {
return nil, errors.New("both oauth and basic authentication can not be set")
}
if out.EnableBearerAuth {
return nil, errors.New("both oauth and bearer authentication can not be set")
}
out.EnableOAuth = true
out.OauthTokenURI = authParams["oauthTokenURI"]
out.Scopes = ParseScope(authParams["scope"])
out.ClientID = authParams["clientID"]
out.ClientSecret = authParams["clientSecret"]
default:
return nil, fmt.Errorf("incorrect value for authMode is given: %s", t)
}
Expand All @@ -92,6 +109,25 @@ func GetAuthConfigs(triggerMetadata, authParams map[string]string) (out *AuthMet
return out, err
}

func ParseScope(inputStr string) []string {
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
scope := strings.TrimSpace(inputStr)
if scope != "" {
scopes := make([]string, 0)
list := strings.Split(scope, ",")
for _, sc := range list {
sc := strings.TrimSpace(sc)
if sc != "" {
scopes = append(scopes, sc)
}
}
if len(scopes) == 0 {
return nil
}
return scopes
}
return nil
}

func GetBearerToken(auth *AuthMeta) string {
return fmt.Sprintf("Bearer %s", auth.BearerToken)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/scalers/authentication/authentication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const (
BearerAuthType Type = "bearer"
// CustomAuthType is an auth type using a custom header
CustomAuthType Type = "custom"
// OAuthType is an auth type using a oAuth2
OAuthType Type = "oauth"
)

// TransportType is type of http transport
Expand All @@ -42,6 +44,13 @@ type AuthMeta struct {
Key string
CA string

// oAuth2
EnableOAuth bool
OauthTokenURI string
Scopes []string
ClientID string
ClientSecret string

// custom auth header
EnableCustomAuth bool
CustomAuthHeader string
Expand Down
25 changes: 24 additions & 1 deletion pkg/scalers/pulsar_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

"github.com/go-logr/logr"
"golang.org/x/oauth2/clientcredentials"
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/metrics/pkg/apis/external_metrics"
Expand Down Expand Up @@ -211,6 +212,18 @@ func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetada
if err != nil {
return meta, fmt.Errorf("error parsing %s: %w", msgBacklogMetricName, err)
}

if auth != nil && auth.EnableOAuth {
if auth.OauthTokenURI == "" {
auth.OauthTokenURI = config.TriggerMetadata["oauthTokenURI"]
}
if auth.Scopes == nil {
auth.Scopes = authentication.ParseScope(config.TriggerMetadata["scope"])
}
if auth.ClientID == "" {
auth.ClientID = config.TriggerMetadata["clientID"]
}
}
meta.pulsarAuth = auth
meta.scalerIndex = config.ScalerIndex
return meta, nil
Expand All @@ -224,9 +237,19 @@ func (s *pulsarScaler) GetStats(ctx context.Context) (*pulsarStats, error) {
return nil, fmt.Errorf("error requesting stats from admin url: %w", err)
}

client := s.client
if s.metadata.pulsarAuth.EnableOAuth {
config := clientcredentials.Config{
ClientID: s.metadata.pulsarAuth.ClientID,
ClientSecret: s.metadata.pulsarAuth.ClientSecret,
TokenURL: s.metadata.pulsarAuth.OauthTokenURI,
Scopes: s.metadata.pulsarAuth.Scopes,
}
client = config.Client(context.Background())
}
addAuthHeaders(req, &s.metadata)

res, err := s.client.Do(req)
res, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("error requesting stats from admin url: %w", err)
}
Expand Down
81 changes: 74 additions & 7 deletions pkg/scalers/pulsar_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ type parsePulsarAuthParamsTestData struct {
bearerToken string
username string
password string
enableOAuth bool
oauthTokenURI string
scope string
clientID string
clientSecret string
}

type pulsarMetricIdentifier struct {
Expand Down Expand Up @@ -74,18 +79,33 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{

var parsePulsarMetadataTestAuthTLSDataset = []parsePulsarAuthParamsTestData{
// Passes, mutual TLS, no other auth (legacy "tls: enable")
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Passes, mutual TLS, no other auth (uses new way to enable tls)
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "keydata", "ca": "cadata"}, false, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Fails, mutual TLS (legacy "tls: enable") without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable"}, map[string]string{"cert": "", "key": "keydata", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Fails, mutual TLS, (uses new way to enable tls) without cert
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "tls"}, map[string]string{"cert": "certdata", "key": "", "ca": "cadata"}, true, true, "certdata", "keydata", "cadata", "", "", "", false, "", "", "", ""},
// Passes, server side TLS with bearer token. Note that EnableTLS is expected to be false because it is not mTLS.
// The legacy behavior required tls: enable in order to configure a custom root ca. Now, all that is required is configuring a root ca.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", ""},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "tls": "enable", "authModes": "bearer"}, map[string]string{"ca": "cadata", "bearerToken": "my-special-token"}, false, false, "", "", "cadata", "my-special-token", "", "", false, "", "", "", ""},
// Passes, server side TLS with basic auth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123"},
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "basic"}, map[string]string{"ca": "cadata", "username": "admin", "password": "password123"}, false, false, "", "", "cadata", "", "admin", "password123", false, "", "", "", ""},

// Passes, server side TLS with oauth. Note that EnableTLS is expected to be false because it is not mTLS.
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https1", "scope1", "id1", "secret123"},
// Passes, oauth config data is set from metadata only
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https2", "scope": "scope2", "clientID": "id2"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https2", "scope2", "id2", ""},
// Passes, oauth config data is set from TriggerAuth if both provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https1", "scope": "scope1", "clientID": "id1"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https3", "scope": "scope3", "clientID": "id3", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https3", "scope3", "id3", "secret123"},
// Passes, with multiple scopes from metadata
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "oauthTokenURI": "https4", "scope": " sc:scope2, \tsc:scope1 ", "clientID": "id4"}, map[string]string{"ca": "cadata", "oauthTokenURI": "", "scope": "", "clientID": "", "clientSecret": ""}, false, false, "", "", "cadata", "", "", "", false, "https4", "sc:scope1 sc:scope2", "id4", ""},
// Passes, with multiple scopes from TriggerAuth
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " sc:scope2, \tsc:scope1 \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "sc:scope1 sc:scope2", "id5", "secret123"},
// Passes, no scope provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth"}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
// Passes, invalid scopes provided
{map[string]string{"adminURL": "http://172.20.0.151:80", "topic": "persistent://public/default/my-topic", "subscription": "sub1", "authModes": "oauth", "scope": " "}, map[string]string{"ca": "cadata", "oauthTokenURI": "https5", "scope": " , \n", "clientID": "id5", "clientSecret": "secret123"}, false, false, "", "", "cadata", "", "", "", false, "https5", "", "id5", "secret123"},
}

var pulsarMetricIdentifiers = []pulsarMetricIdentifier{
Expand Down Expand Up @@ -176,6 +196,20 @@ func TestParsePulsarMetadata(t *testing.T) {
}
}

func compareScope(scopes []string, scopeStr string) bool {
scopeMap := make(map[string]bool)
for _, scope := range scopes {
scopeMap[scope] = true
}
scopeList := strings.Fields(scopeStr)
for _, scope := range scopeList {
if !scopeMap[scope] {
return false
}
}
return true
}

func TestPulsarAuthParams(t *testing.T) {
for _, testData := range parsePulsarMetadataTestAuthTLSDataset {
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, "test_pulsar_scaler")
Expand Down Expand Up @@ -218,7 +252,12 @@ func TestPulsarAuthParams(t *testing.T) {
}

if meta.pulsarAuth.EnableBasicAuth != (testData.username != "" || testData.password != "") {
t.Errorf("Expected EnableBearerAuth to be true when bearerToken is %s\n", testData.bearerToken)
if testData.username != "" {
t.Errorf("Expected EnableBasicAuth to be true when username is %s\n", testData.username)
}
if testData.password != "" {
t.Errorf("Expected EnableBasicAuth to be true when password is %s\n", testData.password)
}
}

if meta.pulsarAuth.Username != testData.username {
Expand All @@ -228,6 +267,34 @@ func TestPulsarAuthParams(t *testing.T) {
if meta.pulsarAuth.Password != testData.password {
t.Errorf("Expected password to be set to %s but got %s\n", testData.password, meta.pulsarAuth.Password)
}

if meta.pulsarAuth.EnableOAuth != (testData.clientID != "" || testData.clientSecret != "") {
if testData.clientID != "" {
t.Errorf("Expected EnableOAuth to be true when clientID is %s\n", testData.clientID)
}
if testData.clientSecret != "" {
t.Errorf("Expected EnableOAuth to be true when clientSecret is %s\n", testData.clientSecret)
}
}

if meta.pulsarAuth.OauthTokenURI != testData.oauthTokenURI {
t.Errorf("Expected oauthTokenURI to be set to %s but got %s\n", testData.oauthTokenURI, meta.pulsarAuth.OauthTokenURI)
}

if testData.scope != "" && !compareScope(meta.pulsarAuth.Scopes, testData.scope) {
t.Errorf("Expected scopes %s but got %s\n", testData.scope, meta.pulsarAuth.Scopes)
}
if testData.scope == "" && meta.pulsarAuth.Scopes != nil {
t.Errorf("Expected scopes to be null but got %s\n", meta.pulsarAuth.Scopes)
}

if meta.pulsarAuth.ClientID != testData.clientID {
t.Errorf("Expected clientID to be set to %s but got %s\n", testData.clientID, meta.pulsarAuth.ClientID)
}

if meta.pulsarAuth.ClientSecret != testData.clientSecret {
t.Errorf("Expected clientSecret to be set to %s but got %s\n", testData.clientSecret, meta.pulsarAuth.ClientSecret)
}
}
}

Expand Down
Loading