diff --git a/tests/rptest/tests/pandaproxy_test.py b/tests/rptest/tests/pandaproxy_test.py index 353e008e5d4b4..2ad4d5e7eea50 100644 --- a/tests/rptest/tests/pandaproxy_test.py +++ b/tests/rptest/tests/pandaproxy_test.py @@ -7,7 +7,6 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 -import base64 import http.client import json import uuid @@ -159,16 +158,17 @@ def test_get_brokers_with_stale_health_metadata(self): backoff_sec=1) -class PandaProxyTest(RedpandaTest): +class PandaProxyEndpoints(RedpandaTest): """ - Test pandaproxy against a redpanda cluster. + All the Pandaproxy endpoints """ - def __init__(self, context): - super(PandaProxyTest, self).__init__( + def __init__(self, context, **kwargs): + super(PandaProxyEndpoints, self).__init__( context, num_brokers=3, enable_pp=True, - extra_rp_conf={"auto_create_topics_enabled": False}) + extra_rp_conf={"auto_create_topics_enabled": False}, + **kwargs) http.client.HTTPConnection.debuglevel = 1 http.client.print = lambda *args: self.logger.debug(" ".join(args)) @@ -176,8 +176,10 @@ def __init__(self, context): def _base_uri(self): return f"http://{self.redpanda.nodes[0].account.hostname}:8082" - def _get_brokers(self, headers=HTTP_GET_BROKERS_HEADERS): - return requests.get(f"{self._base_uri()}/brokers", headers=headers) + def _get_brokers(self, headers=HTTP_GET_BROKERS_HEADERS, **kwargs): + return requests.get(f"{self._base_uri()}/brokers", + headers=headers, + **kwargs) def _create_topics(self, names=create_topic_names(1), @@ -193,16 +195,20 @@ def _create_topics(self, assert set(names).issubset(self._get_topics().json()) return names - def _get_topics(self, headers=HTTP_GET_TOPICS_HEADERS): - return requests.get(f"{self._base_uri()}/topics", headers=headers) + def _get_topics(self, headers=HTTP_GET_TOPICS_HEADERS, **kwargs): + return requests.get(f"{self._base_uri()}/topics", + headers=headers, + **kwargs) def _produce_topic(self, topic, data, - headers=HTTP_PRODUCE_BINARY_V2_TOPIC_HEADERS): + headers=HTTP_PRODUCE_BINARY_V2_TOPIC_HEADERS, + **kwargs): return requests.post(f"{self._base_uri()}/topics/{topic}", data, - headers=headers) + headers=headers, + **kwargs) def _fetch_topic(self, topic, @@ -210,12 +216,17 @@ def _fetch_topic(self, offset=0, max_bytes=1024, timeout_ms=1000, - headers=HTTP_FETCH_TOPIC_HEADERS): + headers=HTTP_FETCH_TOPIC_HEADERS, + **kwargs): return requests.get( f"{self._base_uri()}/topics/{topic}/partitions/{partition}/records?offset={offset}&max_bytes={max_bytes}&timeout={timeout_ms}", - headers=headers) + headers=headers, + **kwargs) - def _create_consumer(self, group_id, headers=HTTP_CREATE_CONSUMER_HEADERS): + def _create_consumer(self, + group_id, + headers=HTTP_CREATE_CONSUMER_HEADERS, + **kwargs): res = requests.post(f"{self._base_uri()}/consumers/{group_id}", ''' { @@ -225,9 +236,18 @@ def _create_consumer(self, group_id, headers=HTTP_CREATE_CONSUMER_HEADERS): "fetch.min.bytes": "1", "consumer.request.timeout.ms": "10000" }''', - headers=headers) + headers=headers, + **kwargs) return res + +class PandaProxyTest(PandaProxyEndpoints): + """ + Test pandaproxy against a redpanda cluster. + """ + def __init__(self, context): + super(PandaProxyTest, self).__init__(context) + def _create_named_consumer(self, group_id, name, @@ -857,35 +877,20 @@ def test_consumer_group_json_v2(self): assert rc_res.status_code == requests.codes.no_content -class PandaProxySASLTest(RedpandaTest): +class PandaProxySASLTest(PandaProxyEndpoints): """ Test pandaproxy can connect using SASL. """ def __init__(self, context): - extra_rp_conf = dict(auto_create_topics_enabled=False, ) - security = SecurityConfig() security.enable_sasl = True - super(PandaProxySASLTest, self).__init__(context, - num_brokers=3, - enable_pp=True, - security=security, - extra_rp_conf=extra_rp_conf) - - http.client.HTTPConnection.debuglevel = 1 - http.client.print = lambda *args: self.logger.debug(" ".join(args)) + super(PandaProxySASLTest, self).__init__(context, security=security) def _get_super_client(self): user, password, _ = self.redpanda.SUPERUSER_CREDENTIALS return KafkaCliTools(self.redpanda, user=user, passwd=password) - def _base_uri(self): - return f"http://{self.redpanda.nodes[0].account.hostname}:8082" - - def _get_topics(self, headers=HTTP_GET_TOPICS_HEADERS): - return requests.get(f"{self._base_uri()}/topics", headers=headers) - @cluster(num_nodes=3) def test_list_topics(self): client = self._get_super_client() @@ -907,56 +912,27 @@ def topics_appeared(): err_msg="Timeout waiting for topics to appear.") -class PandaProxyBasicAuthTest(RedpandaTest): - password = 'simple' - algorithm = 'SCRAM-SHA-256' - +class PandaProxyBasicAuthTest(PandaProxyEndpoints): def __init__(self, context): - extra_rp_conf = dict(auto_create_topics_enabled=False, ) - security = SecurityConfig() security.enable_sasl = True security.endpoint_authn_method = 'sasl' security.pp_authn_method = 'http_basic' - super(PandaProxyBasicAuthTest, - self).__init__(context, - num_brokers=3, - enable_pp=True, - security=security, - extra_rp_conf=extra_rp_conf) - - self.admin = Admin(self.redpanda) - - http.client.HTTPConnection.debuglevel = 1 - http.client.print = lambda *args: self.logger.debug(" ".join(args)) - - def _base_uri(self): - return f"http://{self.redpanda.nodes[0].account.hostname}:8082" - - def _get_brokers(self, headers=HTTP_GET_BROKERS_HEADERS): - return requests.get(f"{self._base_uri()}/brokers", headers=headers) - - def encode_base64(self, username: str, password: str): - msg = f'{username}:{password}' - # The decode at the end removes bytes type - return base64.b64encode(msg.encode('ascii')).decode() + super(PandaProxyBasicAuthTest, self).__init__(context, + security=security) @cluster(num_nodes=3) def test_basic_auth(self): - # Regular user has no access to Kafka API - headers = { - "authorization": - "Basic " + self.encode_base64("panda", self.password) - } - brokers = self._get_brokers(headers=headers).json() - assert brokers['error_code'] == 40101 + brokers = self._get_brokers(auth=('red', 'panda')).json() + assert brokers['error_code'] == 401 # Super user has full access to Kafka API - username, password, _ = self.redpanda.SUPERUSER_CREDENTIALS - headers = { - "authorization": "Basic " + self.encode_base64(username, password) - } - brokers = self._get_brokers(headers=headers).json() - brokers['brokers'].sort() - assert brokers['brokers'] == [1, 2, 3] + super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS + brokers_raw = self._get_brokers(auth=(super_username, super_password)) + brokers = brokers_raw.json()['brokers'] + + nodes = enumerate(self.redpanda.nodes, 1) + node_idxs = [node[0] for node in nodes] + + assert sorted(brokers) == sorted(node_idxs)