diff --git a/CHANGELOG.md b/CHANGELOG.md index 310cfc2905c..310b56b6fbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ - Add support to get connection data from Trigger Authorization in MSSQL Scaler ([#2112](https://github.com/kedacore/keda/pull/2112)) - Add support to get connection data from Trigger Authorization in PostgreSQL Scaler ([#2114](https://github.com/kedacore/keda/pull/2114)) - Add support to provide the metric name in Azure Log Analytics Scaler ([#2106](https://github.com/kedacore/keda/pull/2106)) +- Add `pageSize` (using regex) in RabbitMQ Scaler ([#2162](https://github.com/kedacore/keda/pull/2162)) ### Breaking Changes diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 25017ecd125..2f379cef8ac 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -67,6 +67,7 @@ type rabbitMQMetadata struct { protocol string // either http or amqp protocol vhostName *string // override the vhost from the connection info useRegex bool // specify if the queueName contains a rexeg + pageSize int // specify the page size if useRegex is enabled operation string // specify the operation to apply in case of multiples queues metricName string // custom metric name for trigger timeout time.Duration // custom http timeout for a specific trigger @@ -197,6 +198,20 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { meta.useRegex = useRegex } + // Resolve pageSize + if val, ok := config.TriggerMetadata["pageSize"]; ok { + pageSize, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return nil, fmt.Errorf("pageSize has invalid value") + } + meta.pageSize = int(pageSize) + if meta.pageSize < 1 { + return nil, fmt.Errorf("pageSize should be 1 or greater than 1") + } + } else { + meta.pageSize = 100 + } + // Resolve operation meta.operation = defaultOperation if val, ok := config.TriggerMetadata["operation"]; ok { @@ -412,9 +427,9 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) { parsedURL.Path = "" var getQueueInfoManagementURI string if s.metadata.useRegex { - getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s", parsedURL.String(), "api/queues?page=1&use_regex=true&pagination=false&name=", url.QueryEscape(s.metadata.queueName)) + getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), url.QueryEscape(s.metadata.queueName), s.metadata.pageSize) } else { - getQueueInfoManagementURI = fmt.Sprintf("%s/%s%s/%s", parsedURL.String(), "api/queues", vhost, url.QueryEscape(s.metadata.queueName)) + getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s/%s", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName)) } var info queueInfo diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index 8de324b6abf..dd43416bafa 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -104,6 +104,12 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "http://", "timeout": "error"}, true, map[string]string{}}, // amqp timeout {map[string]string{"mode": "QueueLength", "value": "1000", "queueName": "sample", "host": "amqp://", "timeout": "10"}, true, map[string]string{}}, + // valid pageSize + {map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "100"}, false, map[string]string{}}, + // pageSize less than 1 + {map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "-1"}, true, map[string]string{}}, + // invalid pageSize + {map[string]string{"mode": "MessageRate", "value": "1000", "queueName": "sample", "host": "http://", "useRegex": "true", "pageSize": "a"}, true, map[string]string{}}, } var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{ @@ -321,7 +327,7 @@ var testRegexQueueInfoTestData = []getQueueInfoTestData{ func TestGetQueueInfoWithRegex(t *testing.T) { for _, testData := range testRegexQueueInfoTestData { var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24" + expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24&page_size=100" if r.RequestURI != expectedPath { t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI) } @@ -378,6 +384,68 @@ func TestGetQueueInfoWithRegex(t *testing.T) { } } +type getRegexPageSizeTestData struct { + queueInfo getQueueInfoTestData + pageSize int +} + +var testRegexPageSizeTestData = []getRegexPageSizeTestData{ + {testRegexQueueInfoTestData[0], 100}, + {testRegexQueueInfoTestData[0], 200}, + {testRegexQueueInfoTestData[0], 500}, +} + +func TestGetPageSizeWithRegex(t *testing.T) { + for _, testData := range testRegexPageSizeTestData { + var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + expectedPath := fmt.Sprintf("/api/queues?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=%d", testData.pageSize) + if r.RequestURI != expectedPath { + t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI) + } + + w.WriteHeader(testData.queueInfo.responseStatus) + _, err := w.Write([]byte(testData.queueInfo.response)) + if err != nil { + t.Error("Expect request path to =", testData.queueInfo.response, "but it is", err) + } + })) + + resolvedEnv := map[string]string{host: fmt.Sprintf("%s%s", apiStub.URL, testData.queueInfo.vhostPath), "plainHost": apiStub.URL} + + metadata := map[string]string{ + "queueName": "^evaluate_trials$", + "hostFromEnv": host, + "protocol": "http", + "useRegex": "true", + "pageSize": fmt.Sprint(testData.pageSize), + } + + s, err := NewRabbitMQScaler( + &ScalerConfig{ + ResolvedEnv: resolvedEnv, + TriggerMetadata: metadata, + AuthParams: map[string]string{}, + GlobalHTTPTimeout: 1000 * time.Millisecond, + }, + ) + + if err != nil { + t.Error("Expect success", err) + } + + ctx := context.TODO() + active, err := s.IsActive(ctx) + + if err != nil { + t.Error("Expect success", err) + } + + if !active { + t.Error("Expect to be active") + } + } +} + func TestRabbitMQGetMetricSpecForScaling(t *testing.T) { for _, testData := range rabbitMQMetricIdentifiers { meta, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil}) @@ -452,7 +520,7 @@ var testRegexQueueInfoNavigationTestData = []getQueueInfoNavigationTestData{ func TestRegexQueueMissingError(t *testing.T) { for _, testData := range testRegexQueueInfoNavigationTestData { var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials" + expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=evaluate_trials&page_size=100" if r.RequestURI != expectedPath { t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI) }