Skip to content

Commit

Permalink
pp/tests: put shared PP endpoints in a super class
Browse files Browse the repository at this point in the history
  • Loading branch information
NyaliaLui committed Oct 5, 2022
1 parent 7ccc5a0 commit 12644bf
Showing 1 changed file with 51 additions and 75 deletions.
126 changes: 51 additions & 75 deletions tests/rptest/tests/pandaproxy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

import base64
import http.client
import json
import uuid
Expand Down Expand Up @@ -159,25 +158,28 @@ def test_get_brokers_with_stale_health_metadata(self):
backoff_sec=1)


class PandaProxyTest(RedpandaTest):
class PandaProxyEndpoints(RedpandaTest):
"""
Test pandaproxy against a redpanda cluster.
All the Pandaproxy endpoints
"""
def __init__(self, context):
super(PandaProxyTest, self).__init__(
def __init__(self, context, **kwargs):
super(PandaProxyEndpoints, self).__init__(
context,
num_brokers=3,
enable_pp=True,
extra_rp_conf={"auto_create_topics_enabled": False})
extra_rp_conf={"auto_create_topics_enabled": False},
**kwargs)

http.client.HTTPConnection.debuglevel = 1
http.client.print = lambda *args: self.logger.debug(" ".join(args))

def _base_uri(self):
return f"http://{self.redpanda.nodes[0].account.hostname}:8082"

def _get_brokers(self, headers=HTTP_GET_BROKERS_HEADERS):
return requests.get(f"{self._base_uri()}/brokers", headers=headers)
def _get_brokers(self, headers=HTTP_GET_BROKERS_HEADERS, **kwargs):
return requests.get(f"{self._base_uri()}/brokers",
headers=headers,
**kwargs)

def _create_topics(self,
names=create_topic_names(1),
Expand All @@ -193,29 +195,38 @@ def _create_topics(self,
assert set(names).issubset(self._get_topics().json())
return names

def _get_topics(self, headers=HTTP_GET_TOPICS_HEADERS):
return requests.get(f"{self._base_uri()}/topics", headers=headers)
def _get_topics(self, headers=HTTP_GET_TOPICS_HEADERS, **kwargs):
return requests.get(f"{self._base_uri()}/topics",
headers=headers,
**kwargs)

def _produce_topic(self,
topic,
data,
headers=HTTP_PRODUCE_BINARY_V2_TOPIC_HEADERS):
headers=HTTP_PRODUCE_BINARY_V2_TOPIC_HEADERS,
**kwargs):
return requests.post(f"{self._base_uri()}/topics/{topic}",
data,
headers=headers)
headers=headers,
**kwargs)

def _fetch_topic(self,
topic,
partition=0,
offset=0,
max_bytes=1024,
timeout_ms=1000,
headers=HTTP_FETCH_TOPIC_HEADERS):
headers=HTTP_FETCH_TOPIC_HEADERS,
**kwargs):
return requests.get(
f"{self._base_uri()}/topics/{topic}/partitions/{partition}/records?offset={offset}&max_bytes={max_bytes}&timeout={timeout_ms}",
headers=headers)
headers=headers,
**kwargs)

def _create_consumer(self, group_id, headers=HTTP_CREATE_CONSUMER_HEADERS):
def _create_consumer(self,
group_id,
headers=HTTP_CREATE_CONSUMER_HEADERS,
**kwargs):
res = requests.post(f"{self._base_uri()}/consumers/{group_id}",
'''
{
Expand All @@ -225,9 +236,18 @@ def _create_consumer(self, group_id, headers=HTTP_CREATE_CONSUMER_HEADERS):
"fetch.min.bytes": "1",
"consumer.request.timeout.ms": "10000"
}''',
headers=headers)
headers=headers,
**kwargs)
return res


class PandaProxyTest(PandaProxyEndpoints):
"""
Test pandaproxy against a redpanda cluster.
"""
def __init__(self, context):
super(PandaProxyTest, self).__init__(context)

def _create_named_consumer(self,
group_id,
name,
Expand Down Expand Up @@ -857,35 +877,20 @@ def test_consumer_group_json_v2(self):
assert rc_res.status_code == requests.codes.no_content


class PandaProxySASLTest(RedpandaTest):
class PandaProxySASLTest(PandaProxyEndpoints):
"""
Test pandaproxy can connect using SASL.
"""
def __init__(self, context):
extra_rp_conf = dict(auto_create_topics_enabled=False, )

security = SecurityConfig()
security.enable_sasl = True

super(PandaProxySASLTest, self).__init__(context,
num_brokers=3,
enable_pp=True,
security=security,
extra_rp_conf=extra_rp_conf)

http.client.HTTPConnection.debuglevel = 1
http.client.print = lambda *args: self.logger.debug(" ".join(args))
super(PandaProxySASLTest, self).__init__(context, security=security)

def _get_super_client(self):
user, password, _ = self.redpanda.SUPERUSER_CREDENTIALS
return KafkaCliTools(self.redpanda, user=user, passwd=password)

def _base_uri(self):
return f"http://{self.redpanda.nodes[0].account.hostname}:8082"

def _get_topics(self, headers=HTTP_GET_TOPICS_HEADERS):
return requests.get(f"{self._base_uri()}/topics", headers=headers)

@cluster(num_nodes=3)
def test_list_topics(self):
client = self._get_super_client()
Expand All @@ -907,56 +912,27 @@ def topics_appeared():
err_msg="Timeout waiting for topics to appear.")


class PandaProxyBasicAuthTest(RedpandaTest):
password = 'simple'
algorithm = 'SCRAM-SHA-256'

class PandaProxyBasicAuthTest(PandaProxyEndpoints):
def __init__(self, context):
extra_rp_conf = dict(auto_create_topics_enabled=False, )

security = SecurityConfig()
security.enable_sasl = True
security.endpoint_authn_method = 'sasl'
security.pp_authn_method = 'http_basic'

super(PandaProxyBasicAuthTest,
self).__init__(context,
num_brokers=3,
enable_pp=True,
security=security,
extra_rp_conf=extra_rp_conf)

self.admin = Admin(self.redpanda)

http.client.HTTPConnection.debuglevel = 1
http.client.print = lambda *args: self.logger.debug(" ".join(args))

def _base_uri(self):
return f"http://{self.redpanda.nodes[0].account.hostname}:8082"

def _get_brokers(self, headers=HTTP_GET_BROKERS_HEADERS):
return requests.get(f"{self._base_uri()}/brokers", headers=headers)

def encode_base64(self, username: str, password: str):
msg = f'{username}:{password}'
# The decode at the end removes bytes type
return base64.b64encode(msg.encode('ascii')).decode()
super(PandaProxyBasicAuthTest, self).__init__(context,
security=security)

@cluster(num_nodes=3)
def test_basic_auth(self):
# Regular user has no access to Kafka API
headers = {
"authorization":
"Basic " + self.encode_base64("panda", self.password)
}
brokers = self._get_brokers(headers=headers).json()
assert brokers['error_code'] == 40101
brokers = self._get_brokers(auth=('red', 'panda')).json()
assert brokers['error_code'] == 401

# Super user has full access to Kafka API
username, password, _ = self.redpanda.SUPERUSER_CREDENTIALS
headers = {
"authorization": "Basic " + self.encode_base64(username, password)
}
brokers = self._get_brokers(headers=headers).json()
brokers['brokers'].sort()
assert brokers['brokers'] == [1, 2, 3]
super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS
brokers_raw = self._get_brokers(auth=(super_username, super_password))
brokers = brokers_raw.json()['brokers']

nodes = enumerate(self.redpanda.nodes, 1)
node_idxs = [node[0] for node in nodes]

assert sorted(brokers) == sorted(node_idxs)

0 comments on commit 12644bf

Please sign in to comment.