Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v22.3.x] Schema Registry: GET /schemas/ids/<id> should return references #11241

Merged
merged 6 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,10 @@ result<void> sanitize(json::Value::Array& a, json::MemoryPoolAllocator& alloc) {

} // namespace

avro_schema_definition::avro_schema_definition(avro::ValidSchema vs)
: _impl(std::move(vs)) {}
avro_schema_definition::avro_schema_definition(
avro::ValidSchema vs, canonical_schema_definition::references refs)
: _impl(std::move(vs))
, _refs(std::move(refs)) {}

const avro::ValidSchema& avro_schema_definition::operator()() const {
return _impl;
Expand Down Expand Up @@ -418,7 +420,7 @@ ss::future<collected_schema> collect_schema(
collected_schema collected,
ss::sstring name,
canonical_schema schema) {
for (auto& ref : std::move(schema).refs()) {
for (auto& ref : schema.def().refs()) {
if (!collected.contains(ref.name)) {
auto ss = co_await store.get_subject_schema(
std::move(ref.sub), ref.version, include_deleted::no);
Expand All @@ -438,10 +440,13 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema) {
std::optional<avro::Exception> ex;
try {
auto name = schema.sub()();
auto schema_refs = schema.def().refs();
auto refs = co_await collect_schema(store, {}, name, std::move(schema));
auto def = refs.flatten();
co_return avro_schema_definition{avro::compileJsonSchemaFromMemory(
reinterpret_cast<const uint8_t*>(def.data()), def.length())};
co_return avro_schema_definition{
avro::compileJsonSchemaFromMemory(
reinterpret_cast<const uint8_t*>(def.data()), def.length()),
std::move(schema_refs)};
} catch (const avro::Exception& e) {
ex = e;
}
Expand Down Expand Up @@ -488,7 +493,8 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) {

return canonical_schema_definition{
std::string_view{str_buf.GetString(), str_buf.GetSize()},
schema_type::avro};
schema_type::avro,
def.refs()};
}

bool check_compatible(
Expand Down
17 changes: 7 additions & 10 deletions src/v/pandaproxy/schema_registry/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ build_file(pb::DescriptorPool& dp, const pb::FileDescriptorProto& fdp) {
/// on stack unwind.
ss::future<const pb::FileDescriptor*> build_file_with_refs(
pb::DescriptorPool& dp, sharded_store& store, canonical_schema schema) {
for (const auto& ref : schema.refs()) {
for (const auto& ref : schema.def().refs()) {
if (dp.FindFileByName(ref.name)) {
continue;
}
Expand All @@ -281,10 +281,7 @@ ss::future<const pb::FileDescriptor*> build_file_with_refs(
co_await build_file_with_refs(
dp,
store,
canonical_schema{
subject{ref.name},
std::move(dep.schema).def(),
std::move(dep.schema).refs()});
canonical_schema{subject{ref.name}, std::move(dep.schema).def()});
}

parser p;
Expand Down Expand Up @@ -328,8 +325,9 @@ operator<<(std::ostream& os, const protobuf_schema_definition& def) {
ss::future<protobuf_schema_definition>
make_protobuf_schema_definition(sharded_store& store, canonical_schema schema) {
auto impl = ss::make_shared<protobuf_schema_definition::impl>();
auto refs = schema.def().refs();
impl->fd = co_await import_schema(impl->_dp, store, std::move(schema));
co_return protobuf_schema_definition{std::move(impl)};
co_return protobuf_schema_definition{std::move(impl), std::move(refs)};
}

ss::future<canonical_schema_definition>
Expand All @@ -344,12 +342,11 @@ make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema) {
canonical_schema temp{
std::move(schema).sub(),
{canonical_schema_definition::raw_string{schema.def().raw()()},
schema.def().type()},
std::move(schema).refs()};
schema.def().type(),
schema.def().refs()}};

auto validated = co_await validate_protobuf_schema(store, temp);
co_return canonical_schema{
std::move(temp).sub(), std::move(validated), std::move(temp).refs()};
co_return canonical_schema{std::move(temp).sub(), std::move(validated)};
}

namespace {
Expand Down
15 changes: 15 additions & 0 deletions src/v/pandaproxy/schema_registry/requests/get_schemas_ids_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ inline void rjson_serialize(
}
w.Key("schema");
::json::rjson_serialize(w, res.definition.raw());
if (!res.definition.refs().empty()) {
w.Key("references");
w.StartArray();
for (const auto& ref : res.definition.refs()) {
w.StartObject();
w.Key("name");
::json::rjson_serialize(w, ref.name);
w.Key("subject");
::json::rjson_serialize(w, ref.sub);
w.Key("version");
::json::rjson_serialize(w, ref.version);
w.EndObject();
}
w.EndArray();
}
w.EndObject();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ inline void rjson_serialize(
w.Key("schemaType");
::json::rjson_serialize(w, to_string_view(type));
}
if (!res.schema.refs().empty()) {
if (!res.schema.def().refs().empty()) {
w.Key("references");
w.StartArray();
for (const auto& ref : res.schema.refs()) {
for (const auto& ref : res.schema.def().refs()) {
w.StartObject();
w.Key("name");
::json::rjson_serialize(w, ref.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class post_subject_versions_request_handler
subject sub{invalid_subject};
unparsed_schema_definition::raw_string def;
schema_type type{schema_type::avro};
unparsed_schema::references refs;
unparsed_schema_definition::references refs;
};
mutable_schema _schema;

Expand Down Expand Up @@ -207,8 +207,7 @@ class post_subject_versions_request_handler
_state = state::empty;
result.def = {
std::move(_schema.sub),
{std::move(_schema.def), _schema.type},
std::move(_schema.refs)};
{std::move(_schema.def), _schema.type, std::move(_schema.refs)}};
return true;
}
case state::reference: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_version_response) {
R"({\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"type\":\"string\",\"name\":\"field1\"},{\"type\":\"com.acme.Referenced\",\"name\":\"int\"}]})"};
const pps::canonical_schema_definition schema_def{
R"({"type":"record","name":"test","fields":[{"type":"string","name":"field1"},{"type":"com.acme.Referenced","name":"int"}]})",
pps::schema_type::avro};
pps::schema_type::avro,
{{{"com.acme.Referenced"},
pps::subject{"childSubject"},
pps::schema_version{1}}}};
const pps::subject sub{"imported-ref"};

pps::post_subject_versions_version_response response{
.schema{
pps::subject{"imported-ref"},
schema_def,
{{{"com.acme.Referenced"},
pps::subject{"childSubject"},
pps::schema_version{1}}}},
.id{12},
.version{2}};
.schema{pps::subject{"imported-ref"}, schema_def}, .id{12}, .version{2}};

const ss::sstring expected{
R"(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) {
R"({\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"type\":\"string\",\"name\":\"field1\"},{\"type\":\"com.acme.Referenced\",\"name\":\"int\"}]})"};
const pps::unparsed_schema_definition expected_schema_def{
R"({"type":"record","name":"test","fields":[{"type":"string","name":"field1"},{"type":"com.acme.Referenced","name":"int"}]})",
pps::schema_type::avro};
pps::schema_type::avro,
{{.name{"com.acme.Referenced"},
.sub{pps::subject{"childSubject"}},
.version{pps::schema_version{1}}}}};

const ss::sstring payload{
R"(
Expand All @@ -47,13 +50,7 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) {
})"};
const pps::subject sub{"test_subject"};
const parse_result expected{
{sub,
expected_schema_def,
{{.name{"com.acme.Referenced"},
.sub{pps::subject{"childSubject"}},
.version{pps::schema_version{1}}}}},
std::nullopt,
std::nullopt};
{sub, expected_schema_def}, std::nullopt, std::nullopt};

auto result{ppj::rjson_parse(
payload.data(), pps::post_subject_versions_request_handler{sub})};
Expand All @@ -62,8 +59,10 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) {
result.def = {
std::move(result.def).sub(),
pps::unparsed_schema_definition{
ppj::minify(result.def.def().raw()()), pps::schema_type::avro},
std::move(result.def).refs()};
ppj::minify(result.def.def().raw()()),
pps::schema_type::avro,
std::move(result.def).def().refs()}};
// NOLINTEND(bugprone-use-after-move)

BOOST_REQUIRE_EQUAL(expected.def, result.def);
BOOST_REQUIRE_EQUAL(expected.id.has_value(), result.id.has_value());
Expand Down
84 changes: 46 additions & 38 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ bool check_compatible(const valid_schema& reader, const valid_schema& writer) {
});
}

constexpr auto set_accumulator =
[](store::schema_id_set acc, store::schema_id_set refs) {
acc.insert(refs.begin(), refs.end());
return acc;
};

} // namespace

ss::future<> sharded_store::start(ss::smp_service_group sg) {
Expand All @@ -73,8 +79,7 @@ sharded_store::make_canonical_schema(unparsed_schema schema) {
case schema_type::avro: {
co_return canonical_schema{
std::move(schema.sub()),
sanitize_avro_schema_definition(schema.def()).value(),
std::move(schema.refs())};
sanitize_avro_schema_definition(schema.def()).value()};
}
case schema_type::protobuf:
co_return co_await make_canonical_protobuf_schema(
Expand Down Expand Up @@ -217,12 +222,7 @@ ss::future<bool> sharded_store::upsert(
is_deleted deleted) {
co_await upsert_schema(id, std::move(schema).def());
co_return co_await upsert_subject(
marker,
std::move(schema).sub(),
std::move(schema).refs(),
version,
id,
deleted);
marker, std::move(schema).sub(), version, id, deleted);
}

ss::future<bool> sharded_store::has_schema(schema_id id) {
Expand Down Expand Up @@ -299,7 +299,7 @@ ss::future<subject_schema> sharded_store::get_subject_schema(
});

co_return subject_schema{
.schema = {sub, std::move(def), std::move(v_id.refs)},
.schema = {sub, std::move(def)},
.version = v_id.version,
.id = v_id.id,
.deleted = v_id.deleted};
Expand Down Expand Up @@ -328,10 +328,21 @@ sharded_store::get_versions(subject sub, include_deleted inc_del) {
}

ss::future<bool> sharded_store::is_referenced(subject sub, schema_version ver) {
auto map = [sub{std::move(sub)}, ver](store& s) {
return s.is_referenced(sub, ver);
};
co_return co_await _store.map_reduce0(map, false, std::logical_or<>{});
// Find all the schema that reference this sub-ver
auto references = co_await _store.map_reduce0(
[sub{std::move(sub)}, ver](store& s) {
return s.referenced_by(sub, ver);
},
store::schema_id_set{},
set_accumulator);

// Find whether any subject version reference any of the schema
co_return co_await _store.map_reduce0(
[refs{std::move(references)}](store& s) {
return s.subject_versions_has_any_of(refs);
},
false,
std::logical_or<>{});
}

ss::future<std::vector<schema_id>> sharded_store::referenced_by(
Expand All @@ -346,17 +357,23 @@ ss::future<std::vector<schema_id>> sharded_store::referenced_by(
ver = versions.back();
}

auto map = [sub{std::move(sub)}, ver](store& s) {
return s.referenced_by(sub, ver);
};
auto reduce = [](std::vector<schema_id> acc, std::vector<schema_id> refs) {
acc.insert(acc.end(), refs.begin(), refs.end());
return acc;
};
// Find all the schema that reference this sub-ver
auto references = co_await _store.map_reduce0(
map, std::vector<schema_id>{}, reduce);
std::sort(references.begin(), references.end());
co_return references;
[sub{std::move(sub)}, ver](store& s) {
return s.referenced_by(sub, ver);
},
store::schema_id_set{},
set_accumulator);

// Find all the subject versions that reference any of the schema
references = co_await _store.map_reduce0(
[refs{std::move(references)}](store& s) {
return s.subject_versions_with_any_of(refs);
},
store::schema_id_set{},
set_accumulator);

co_return std::vector<schema_id>{references.begin(), references.end()};
}

ss::future<std::vector<schema_version>> sharded_store::delete_subject(
Expand Down Expand Up @@ -462,37 +479,28 @@ sharded_store::upsert_schema(schema_id id, canonical_schema_definition def) {
});
}

ss::future<sharded_store::insert_subject_result> sharded_store::insert_subject(
subject sub, canonical_schema::references refs, schema_id id) {
ss::future<sharded_store::insert_subject_result>
sharded_store::insert_subject(subject sub, schema_id id) {
auto sub_shard{shard_for(sub)};
auto [version, inserted] = co_await _store.invoke_on(
sub_shard,
_smp_opts,
[sub{std::move(sub)}, refs{std::move(refs)}, id](store& s) mutable {
return s.insert_subject(sub, std::move(refs), id);
sub_shard, _smp_opts, [sub{std::move(sub)}, id](store& s) mutable {
return s.insert_subject(sub, id);
});
co_return insert_subject_result{version, inserted};
}

ss::future<bool> sharded_store::upsert_subject(
seq_marker marker,
subject sub,
canonical_schema::references refs,
schema_version version,
schema_id id,
is_deleted deleted) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard,
_smp_opts,
[marker,
sub{std::move(sub)},
refs{std::move(refs)},
version,
id,
deleted](store& s) mutable {
return s.upsert_subject(
marker, std::move(sub), std::move(refs), version, id, deleted);
[marker, sub{std::move(sub)}, version, id, deleted](store& s) mutable {
return s.upsert_subject(marker, std::move(sub), version, id, deleted);
});
}

Expand Down
4 changes: 1 addition & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,11 @@ class sharded_store {
schema_version version;
bool inserted;
};
ss::future<insert_subject_result> insert_subject(
subject sub, canonical_schema::references refs, schema_id id);
ss::future<insert_subject_result> insert_subject(subject sub, schema_id id);

ss::future<bool> upsert_subject(
seq_marker marker,
subject sub,
canonical_schema::references refs,
schema_version version,
schema_id id,
is_deleted deleted);
Expand Down
Loading