Skip to content

Commit

Permalink
schema_registry: Support subjectPrefix for get_subjects
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Jun 25, 2024
1 parent c732baa commit 2ff26f3
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 9 deletions.
5 changes: 4 additions & 1 deletion src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,15 @@ get_subjects(server::request_t rq, server::reply_t rp) {
auto inc_del{
parse::query_param<std::optional<include_deleted>>(*rq.req, "deleted")
.value_or(include_deleted::no)};
auto subject_prefix{
parse::query_param<std::optional<ss::sstring>>(*rq.req, "subjectPrefix")};
rq.req.reset();

// List-type request: must ensure we see latest writes
co_await rq.service().writer().read_sync();

auto subjects = co_await rq.service().schema_store().get_subjects(inc_del);
auto subjects = co_await rq.service().schema_store().get_subjects(
inc_del, subject_prefix);
rp.rep->write_body(
"json",
ss::json::stream_range_as_array(
Expand Down
8 changes: 5 additions & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,12 @@ ss::future<subject_schema> sharded_store::get_subject_schema(
.deleted = v_id.deleted};
}

ss::future<chunked_vector<subject>>
sharded_store::get_subjects(include_deleted inc_del) {
ss::future<chunked_vector<subject>> sharded_store::get_subjects(
include_deleted inc_del, std::optional<ss::sstring> subject_prefix) {
using subjects = chunked_vector<subject>;
auto map = [inc_del](store& s) { return s.get_subjects(inc_del); };
auto map = [inc_del, &subject_prefix](store& s) {
return s.get_subjects(inc_del, subject_prefix);
};
auto reduce = [](subjects acc, subjects subs) {
acc.reserve(acc.size() + subs.size());
std::move(subs.begin(), subs.end(), std::back_inserter(acc));
Expand Down
4 changes: 3 additions & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ class sharded_store {
include_deleted inc_dec);

///\brief Return a list of subjects.
ss::future<chunked_vector<subject>> get_subjects(include_deleted inc_del);
ss::future<chunked_vector<subject>> get_subjects(
include_deleted inc_del,
std::optional<ss::sstring> subject_prefix = std::nullopt);

///\brief Return whether there are any subjects.
ss::future<bool> has_subjects(include_deleted inc_del);
Expand Down
8 changes: 6 additions & 2 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,19 @@ class store {
}

///\brief Return a list of subjects.
chunked_vector<subject> get_subjects(include_deleted inc_del) const {
chunked_vector<subject> get_subjects(
include_deleted inc_del,
const std::optional<ss::sstring>& subject_prefix = std::nullopt) const {
chunked_vector<subject> res;
res.reserve(_subjects.size());
for (const auto& sub : _subjects) {
if (inc_del || !sub.second.deleted) {
auto has_version = absl::c_any_of(
sub.second.versions,
[inc_del](auto const& v) { return inc_del || !v.deleted; });
if (has_version) {
if (
has_version
&& sub.first().starts_with(subject_prefix.value_or(""))) {
res.push_back(sub.first);
}
}
Expand Down
53 changes: 51 additions & 2 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,19 @@ def _get_schemas_ids_id_subjects(self,
headers=headers,
**kwargs)

def _get_subjects(self, deleted=False, headers=HTTP_GET_HEADERS, **kwargs):
def _get_subjects(self,
deleted=False,
subject_prefix=None,
headers=HTTP_GET_HEADERS,
**kwargs):
params = {}
if deleted:
params['deleted'] = 'true'
if subject_prefix:
params['subjectPrefix'] = subject_prefix
return self._request("GET",
f"subjects{'?deleted=true' if deleted else ''}",
"subjects",
params=params,
headers=headers,
**kwargs)

Expand Down Expand Up @@ -2248,6 +2258,45 @@ def test_get_schema_id_versions(self):
assert result_raw.status_code == requests.codes.ok
assert result_raw.json() == [{"subject": subject, "version": 1}]

@cluster(num_nodes=3)
def test_get_subjects(self):
"""
Verify getting subjects
"""
self._init_users()

topics = ['a', 'aa', 'b', 'ab', 'bb']

schema_1_data = json.dumps({"schema": schema1_def})

self.logger.debug("Posting schemas 1 as subject keys")

def post(topic):
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key",
data=schema_1_data,
auth=self.super_auth)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok

for t in topics:
post(t)

def get_subjects(prefix: Optional[str]):
result_raw = self._get_subjects(subject_prefix=prefix,
auth=self.super_auth)
assert result_raw.status_code == requests.codes.ok

return result_raw.json()

assert len(get_subjects(prefix=None)) == 5
assert len(get_subjects(prefix="")) == 5
assert len(get_subjects(prefix="a")) == 3
assert len(get_subjects(prefix="aa")) == 1
assert len(get_subjects(prefix="aaa")) == 0
assert len(get_subjects(prefix="b")) == 2
assert len(get_subjects(prefix="bb")) == 1

@cluster(num_nodes=3)
def test_post_subjects_subject_versions(self):
"""
Expand Down

0 comments on commit 2ff26f3

Please sign in to comment.