Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include vhost for RabbitMQ when retrieving queue info with useRegex #2591

Merged
merged 11 commits into from
Mar 4, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
- **Azure Queue:** Don't call Azure queue GetProperties API unnecessarily ([#2613](https://github.com/kedacore/keda/pull/2613))
- **Datadog Scaler:** Validate query to contain `{` to prevent panic on invalid query ([#2625](https://github.com/kedacore/keda/issues/2625))
- **Kafka Scaler** Make "disable" a valid value for tls auth parameter ([#2608](https://github.com/kedacore/keda/issues/2608))
- **RabbitMQ Scaler:** Include `vhost` for RabbitMQ when retrieving queue info with `useRegex` ([#2498](https://github.com/kedacore/keda/issues/2498))

### Breaking Changes

Expand Down
24 changes: 20 additions & 4 deletions pkg/scalers/rabbitmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
rabbitModeMessageRate = "MessageRate"
defaultRabbitMQQueueLength = 20
rabbitMetricType = "External"
rabbitRootVhostPath = "/%2F"
)

const (
Expand Down Expand Up @@ -412,21 +413,36 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) {
return nil, err
}

// Extract vhost from URL's path.
vhost := parsedURL.Path

// If the URL's path only contains a slash, it represents the trailing slash and
// must be ignored because it may cause confusion with the '/' vhost.
if vhost == "/" {
vhost = ""
}

// Override vhost if requested.
if s.metadata.vhostName != nil {
vhost = "/" + url.QueryEscape(*s.metadata.vhostName)
// If the desired vhost is "All" vhosts, no path is necessary
if *s.metadata.vhostName == "" {
vhost = ""
} else {
vhost = "/" + url.QueryEscape(*s.metadata.vhostName)
}
}

if vhost == "" || vhost == "/" || vhost == "//" {
vhost = "/%2F"
// Encode the '/' vhost if necessary.
if vhost == "//" {
vhost = rabbitRootVhostPath
}

// Clear URL path to get the correct host.
parsedURL.Path = ""

var getQueueInfoManagementURI string
if s.metadata.useRegex {
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), url.QueryEscape(s.metadata.queueName), s.metadata.pageSize)
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s?page=1&use_regex=true&pagination=false&name=%s&page_size=%d", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName), s.metadata.pageSize)
} else {
getQueueInfoManagementURI = fmt.Sprintf("%s/api/queues%s/%s", parsedURL.String(), vhost, url.QueryEscape(s.metadata.queueName))
}
JorTurFer marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
45 changes: 35 additions & 10 deletions pkg/scalers/rabbitmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ var testQueueInfoTestData = []getQueueInfoTestData{
{`Password is incorrect`, http.StatusUnauthorized, false, nil, ""},
}

var vhostPathes = []string{"/myhost", "", "/", "//", "/%2F"}
var vhostPathes = []string{"/myhost", "", "/", "//", rabbitRootVhostPath}

var testQueueInfoTestDataSingleVhost = []getQueueInfoTestData{
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "//"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 1.4}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, ""},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "myhost"}, "/myhost"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, "/"},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": "/"}, rabbitRootVhostPath},
{`{"messages": 4, "messages_unacknowledged": 1, "message_stats": {"publish_details": {"rate": 0}}, "name": "evaluate_trials"}`, http.StatusOK, true, map[string]string{"hostFromEnv": "plainHost", "vhostName": ""}, "/"},
}

Expand All @@ -216,14 +216,19 @@ func TestGetQueueInfo(t *testing.T) {

for _, testData := range allTestData {
testData := testData
expectedVhost := "myhost"

if testData.vhostPath != "/myhost" {
expectedVhost = "%2F"
var expectedVhostPath string
switch testData.vhostPath {
case "/myhost":
expectedVhostPath = "/myhost"
case rabbitRootVhostPath, "//":
expectedVhostPath = rabbitRootVhostPath
default:
expectedVhostPath = ""
}

var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues/" + expectedVhost + "/evaluate_trials"
expectedPath := fmt.Sprintf("/api/queues%s/evaluate_trials", expectedVhostPath)
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down Expand Up @@ -325,10 +330,21 @@ var testRegexQueueInfoTestData = []getQueueInfoTestData{
{`{"items":[]}`, http.StatusOK, false, map[string]string{"mode": "MessageRate", "value": "1000", "useRegex": "true", "operation": "avg"}, ""},
}

var vhostPathesForRegex = []string{"", "/test-vh", rabbitRootVhostPath}

func TestGetQueueInfoWithRegex(t *testing.T) {
allTestData := []getQueueInfoTestData{}
for _, testData := range testRegexQueueInfoTestData {
for _, vhostPath := range vhostPathesForRegex {
testData := testData
testData.vhostPath = vhostPath
allTestData = append(allTestData, testData)
}
}

for _, testData := range allTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := "/api/queues?page=1&use_regex=true&pagination=false&name=%5Eevaluate_trials%24&page_size=100"
expectedPath := fmt.Sprintf("/api/queues%s?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=100", testData.vhostPath)
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down Expand Up @@ -397,9 +413,18 @@ var testRegexPageSizeTestData = []getRegexPageSizeTestData{
}

func TestGetPageSizeWithRegex(t *testing.T) {
allTestData := []getRegexPageSizeTestData{}
for _, testData := range testRegexPageSizeTestData {
for _, vhostPath := range vhostPathesForRegex {
testData := testData
testData.queueInfo.vhostPath = vhostPath
allTestData = append(allTestData, testData)
}
}

for _, testData := range allTestData {
var apiStub = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
expectedPath := fmt.Sprintf("/api/queues?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=%d", testData.pageSize)
expectedPath := fmt.Sprintf("/api/queues%s?page=1&use_regex=true&pagination=false&name=%%5Eevaluate_trials%%24&page_size=%d", testData.queueInfo.vhostPath, testData.pageSize)
if r.RequestURI != expectedPath {
t.Error("Expect request path to =", expectedPath, "but it is", r.RequestURI)
}
Expand Down
27 changes: 27 additions & 0 deletions tests/scalers/rabbitmq-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ export class RabbitMQHelper {
)
}

static createVhost(t, namespace: string, host: string, username: string, password: string, vhostName: string) {
const tmpFile = tmp.fileSync()
fs.writeFileSync(tmpFile.name, createVhostYaml.replace('{{HOST}}', host)
.replace('{{USERNAME_PASSWORD}}', `${username}:${password}`)
.replace('{{VHOST_NAME}}', vhostName)
.replace('{{VHOST_NAME}}', vhostName))
t.is(
0,
sh.exec(`kubectl apply -f ${tmpFile.name} --namespace ${namespace}`).code,
'creating a vhost should work.'
)
}

static publishMessages(t, namespace: string, connectionString: string, messageCount: number, queueName: string) {
// publish messages
const tmpFile = tmp.fileSync()
Expand Down Expand Up @@ -66,6 +79,20 @@ spec:
command: ["send", "{{CONNECTION_STRING}}", "{{MESSAGE_COUNT}}", "{{QUEUE_NAME}}"]
restartPolicy: Never`

const createVhostYaml = `apiVersion: batch/v1
kind: Job
metadata:
name: rabbitmq-create-vhost-{{VHOST_NAME}}
spec:
template:
spec:
containers:
- name: curl-client
image: curlimages/curl
imagePullPolicy: Always
command: ["curl", "-u", "{{USERNAME_PASSWORD}}", "-X", "PUT", "http://{{HOST}}/api/vhosts/{{VHOST_NAME}}"]
restartPolicy: Never`

const rabbitmqDeployYaml = `apiVersion: v1
kind: ConfigMap
metadata:
Expand Down
126 changes: 126 additions & 0 deletions tests/scalers/rabbitmq-queue-http-regex-vhost.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import * as async from 'async'
import * as fs from 'fs'
import * as sh from 'shelljs'
import * as tmp from 'tmp'
import test from 'ava'
import { RabbitMQHelper } from './rabbitmq-helpers'
import {waitForDeploymentReplicaCount} from "./helpers";

const testNamespace = 'rabbitmq-queue-http-regex-vhost-test'
const rabbitmqNamespace = 'rabbitmq-http-regex-vhost-test'
const queueName = 'hello'
const dummyQueueName1 = 'hello-1'
const dummyQueueName2 = 'hellohellohello'
const username = "test-user"
const password = "test-password"
const vhost = "test-vh-regex"
const dummyVhost1 = "test-vh-regex-dummy-one"
const dummyVhost2 = "test-vh-regex-dummy-two"
const connectionHost = `rabbitmq.${rabbitmqNamespace}.svc.cluster.local`
const connectionHostWithAuth = `${username}:${password}@${connectionHost}`
const connectionString = `amqp://${connectionHostWithAuth}/${vhost}`
const connectionStringDummy1 = `amqp://${connectionHostWithAuth}/${dummyVhost1}`
const connectionStringDummy2 = `amqp://${connectionHostWithAuth}/${dummyVhost2}`
const messageCount = 500

test.before(t => {
RabbitMQHelper.installRabbit(t, username, password, vhost, rabbitmqNamespace)

sh.config.silent = true
// create deployment
const httpConnectionString = `http://${connectionHostWithAuth}/${vhost}`

RabbitMQHelper.createDeployment(t, testNamespace, deployYaml, connectionString, httpConnectionString, queueName)

RabbitMQHelper.createVhost(t, testNamespace, connectionHost, username, password, dummyVhost1)
RabbitMQHelper.createVhost(t, testNamespace, connectionHost, username, password, dummyVhost2)
})

test.serial('Deployment should have 0 replicas on start', t => {
const replicaCount = sh.exec(
`kubectl get deployment.apps/test-deployment --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"`
).stdout
t.is(replicaCount, '0', 'replica count should start out as 0')
})

test.serial(`Deployment should scale to 4 with ${messageCount} messages on the queue then back to 0`, async t => {
RabbitMQHelper.publishMessages(t, testNamespace, connectionStringDummy1, messageCount, dummyQueueName1)
RabbitMQHelper.publishMessages(t, testNamespace, connectionStringDummy2, messageCount, dummyQueueName2)
RabbitMQHelper.publishMessages(t, testNamespace, connectionString, messageCount, queueName)

// with messages published, the consumer deployment should start receiving the messages
t.true(await waitForDeploymentReplicaCount(4, 'test-deployment', testNamespace, 20, 5000), 'Replica count should be 4 after 10 seconds')
t.true(await waitForDeploymentReplicaCount(0, 'test-deployment', testNamespace, 50, 5000), 'Replica count should be 0 after 3 minutes')
})

test.after.always.cb('clean up rabbitmq-queue deployment', t => {
const resources = [
'scaledobject.keda.sh/test-scaledobject',
'secret/test-secrets-api',
'deployment.apps/test-deployment',
]

for (const resource of resources) {
sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`)
}
sh.exec(`kubectl delete namespace ${testNamespace}`)
// remove rabbitmq
RabbitMQHelper.uninstallRabbit(rabbitmqNamespace)
t.end()
})

const deployYaml = `apiVersion: v1
kind: Secret
metadata:
name: test-secrets-api
data:
RabbitApiHost: {{CONNECTION_STRING_BASE64}}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-deployment
labels:
app: test-deployment
spec:
replicas: 0
selector:
matchLabels:
app: test-deployment
template:
metadata:
labels:
app: test-deployment
spec:
containers:
- name: rabbitmq-consumer
image: ghcr.io/kedacore/tests-rabbitmq
imagePullPolicy: Always
command:
- receive
args:
- '{{CONNECTION_STRING}}'
envFrom:
- secretRef:
name: test-secrets-api
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: test-scaledobject
spec:
scaleTargetRef:
name: test-deployment
pollingInterval: 5
cooldownPeriod: 10
minReplicaCount: 0
maxReplicaCount: 4
triggers:
- type: rabbitmq
metadata:
queueName: "^hell.{1}$"
hostFromEnv: RabbitApiHost
protocol: http
useRegex: 'true'
operation: sum
queueLength: '50'`