Skip to content

Commit

Permalink
Add pageSize (using regex) in RabbitMQ Scaler (kedacore#2162)
Browse files Browse the repository at this point in the history
Signed-off-by: jorturfer <jorge_turrado@hotmail.es>
Signed-off-by: nilayasiktoprak <nilayasiktoprak@gmail.com>
  • Loading branch information
Jorge Turrado Ferrero authored and nilayasiktoprak committed Oct 23, 2021
1 parent 7e2683c commit b325d7b
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- Add support to get connection data from Trigger Authorization in MSSQL Scaler ([#2112](https://github.com/kedacore/keda/pull/2112))
- Add support to get connection data from Trigger Authorization in PostgreSQL Scaler ([#2114](https://github.com/kedacore/keda/pull/2114))
- Add support to provide the metric name in Azure Log Analytics Scaler ([#2106](https://github.com/kedacore/keda/pull/2106))
- Add `pageSize` (using regex) in RabbitMQ Scaler ([#2162](https://github.com/kedacore/keda/pull/2162))

### Breaking Changes

Expand Down
19 changes: 17 additions & 2 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type rabbitMQMetadata struct {
protocol string // either http or amqp protocol
vhostName *string // override the vhost from the connection info
useRegex bool // specify if the queueName contains a rexeg
pageSize int // specify the page size if useRegex is enabled
operation string // specify the operation to apply in case of multiples queues
metricName string // custom metric name for trigger
timeout time.Duration // custom http timeout for a specific trigger
Expand Down Expand Up @@ -197,6 +198,20 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) {
meta.useRegex = useRegex
}

// Resolve pageSize
if val, ok := config.TriggerMetadata["pageSize"]; ok {
pageSize, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("pageSize has invalid value")
}
meta.pageSize = int(pageSize)
if meta.pageSize < 1 {
return nil, fmt.Errorf("pageSize should be 1 or greater than 1")
}
} else {
meta.pageSize = 100
}

// Resolve operation
meta.operation = defaultOperation
if val, ok := config.TriggerMetadata["operation"]; ok {
Expand Down Expand Up @@ -412,9 +427,9 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
parsedURL.Path = ""
var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?page=1&use_regex=true&pagination=false&name=", url.QueryEscape(s.metadata.queueName))
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), url.QueryEscape(s.metadata.queueName), s.metadata.pageSize)
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, url.QueryEscape(s.metadata.queueName))
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s/%s", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName))
}

var info queueInfo
Expand Down
72 changes: 70 additions & 2 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "error"}, true, map[string]string{}},
// amqp timeout
{map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "amqp://", "timeout": "10"}, true, map[string]string{}},
// valid pageSize
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "100"}, false, map[string]string{}},
// pageSize less than 1
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "-1"}, true, map[string]string{}},
// invalid pageSize
{map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "a"}, true, map[string]string{}},
}

var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{
Expand Down Expand Up @@ -321,7 +327,7 @@ var testRegexQueueInfoTestData = []getQueueInfoTestData{
func TestGetQueueInfoWithRegex(t *testing.T) {
for _, testData := range testRegexQueueInfoTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24"
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24&page_size=100"
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down Expand Up @@ -378,6 +384,68 @@ func TestGetQueueInfoWithRegex(t *testing.T) {
}
}

type getRegexPageSizeTestData struct {
queueInfo getQueueInfoTestData
pageSize int
}

var testRegexPageSizeTestData = []getRegexPageSizeTestData{
{testRegexQueueInfoTestData[0], 100},
{testRegexQueueInfoTestData[0], 200},
{testRegexQueueInfoTestData[0], 500},
}

func TestGetPageSizeWithRegex(t *testing.T) {
for _, testData := range testRegexPageSizeTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := fmt.Sprintf("/api/queues?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=%d", testData.pageSize)
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}

w.WriteHeader(testData.queueInfo.responseStatus)
_, err := w.Write([]byte(testData.queueInfo.response))
if err != nil {
t.Error("Expect request path to =", testData.queueInfo.response, "but it is", err)
}
}))

resolvedEnv := map[string]string{host: fmt.Sprintf("%s%s", apiStub.URL, testData.queueInfo.vhostPath), "plainHost": apiStub.URL}

metadata := map[string]string{
"queueName": "^evaluate_trials$",
"hostFromEnv": host,
"protocol": "http",
"useRegex": "true",
"pageSize": fmt.Sprint(testData.pageSize),
}

s, err := NewRabbitMQScaler(
&ScalerConfig{
ResolvedEnv: resolvedEnv,
TriggerMetadata: metadata,
AuthParams: map[string]string{},
GlobalHTTPTimeout: 1000 * time.Millisecond,
},
)

if err != nil {
t.Error("Expect success", err)
}

ctx := context.TODO()
active, err := s.IsActive(ctx)

if err != nil {
t.Error("Expect success", err)
}

if !active {
t.Error("Expect to be active")
}
}
}

func TestRabbitMQGetMetricSpecForScaling(t *testing.T) {
for _, testData := range rabbitMQMetricIdentifiers {
meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil})
Expand Down Expand Up @@ -452,7 +520,7 @@ var testRegexQueueInfoNavigationTestData = []getQueueInfoNavigationTestData{
func TestRegexQueueMissingError(t *testing.T) {
for _, testData := range testRegexQueueInfoNavigationTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials"
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials&page_size=100"
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down

0 comments on commit b325d7b

Please sign in to comment.