Skip to content

Commit

Permalink
schema_registry: Refactor: Move references to schema
Browse files Browse the repository at this point in the history
This should be a pure refactoring.

The store has two mappings:
1) `schema_map`: Schema ID to Schema (sharded across cores)
2) `subject_map`: Subject-Versions to Schema ID

Each of these mappings are then split across shards, coordinated
by sharded_store.

Prior to this change, references were attached to the Subject-Versions.

This is an incorrect modelling; Schema should own the references
to other schema. This commit does that refactoring by moving references
from `typed_schema` to `typed_schema_definition`.

There are a couple of things that are not straightforward:
1) `ValidSchema` (`protobuf_schema_definition`, `avro_schema_definition`)
   are the respective library instantiations of the representation,
   and are unable to reproduce their references to form a
   `canonical_schema_definition`, so references are stored inside the
   wrappers.
2) `referenced_by` and `is_reference` now require two phases to
   obtain the references, due to the sharded and diconnected nature of
   the two mappings (schema, and subject-versions).
   a) Obtain a list of schema ids that reference the subject-version
   b) Filter that list by subject-versions that are not soft-deleted

Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Jun 5, 2023
1 parent 5d5072b commit 8789740
Show file tree
Hide file tree
Showing 19 changed files with 219 additions and 189 deletions.
17 changes: 11 additions & 6 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,10 @@ result<void> sanitize(json::Value::Array& a, json::MemoryPoolAllocator& alloc) {

} // namespace

avro_schema_definition::avro_schema_definition(avro::ValidSchema vs)
: _impl(std::move(vs)) {}
avro_schema_definition::avro_schema_definition(
avro::ValidSchema vs, canonical_schema_definition::references refs)
: _impl(std::move(vs))
, _refs(std::move(refs)) {}

const avro::ValidSchema& avro_schema_definition::operator()() const {
return _impl;
Expand Down Expand Up @@ -422,7 +424,7 @@ ss::future<collected_schema> collect_schema(
collected_schema collected,
ss::sstring name,
canonical_schema schema) {
for (auto& ref : std::move(schema).refs()) {
for (auto& ref : schema.def().refs()) {
if (!collected.contains(ref.name)) {
auto ss = co_await store.get_subject_schema(
std::move(ref.sub), ref.version, include_deleted::no);
Expand All @@ -445,8 +447,10 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema) {
auto name = schema.sub()();
auto refs = co_await collect_schema(store, {}, name, std::move(schema));
auto def = refs.flatten();
co_return avro_schema_definition{avro::compileJsonSchemaFromMemory(
reinterpret_cast<const uint8_t*>(def.data()), def.length())};
co_return avro_schema_definition{
avro::compileJsonSchemaFromMemory(
reinterpret_cast<const uint8_t*>(def.data()), def.length()),
schema.def().refs()};
} catch (const avro::Exception& e) {
ex = e;
}
Expand Down Expand Up @@ -493,7 +497,8 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) {

return canonical_schema_definition{
std::string_view{str_buf.GetString(), str_buf.GetSize()},
schema_type::avro};
schema_type::avro,
def.refs()};
}

bool check_compatible(
Expand Down
17 changes: 7 additions & 10 deletions src/v/pandaproxy/schema_registry/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ build_file(pb::DescriptorPool& dp, const pb::FileDescriptorProto& fdp) {
/// on stack unwind.
ss::future<const pb::FileDescriptor*> build_file_with_refs(
pb::DescriptorPool& dp, sharded_store& store, canonical_schema schema) {
for (const auto& ref : schema.refs()) {
for (const auto& ref : schema.def().refs()) {
if (dp.FindFileByName(ref.name)) {
continue;
}
Expand All @@ -282,10 +282,7 @@ ss::future<const pb::FileDescriptor*> build_file_with_refs(
co_await build_file_with_refs(
dp,
store,
canonical_schema{
subject{ref.name},
std::move(dep.schema).def(),
std::move(dep.schema).refs()});
canonical_schema{subject{ref.name}, std::move(dep.schema).def()});
}

parser p;
Expand Down Expand Up @@ -345,8 +342,9 @@ operator<<(std::ostream& os, const protobuf_schema_definition& def) {
ss::future<protobuf_schema_definition>
make_protobuf_schema_definition(sharded_store& store, canonical_schema schema) {
auto impl = ss::make_shared<protobuf_schema_definition::impl>();
auto refs = schema.def().refs();
impl->fd = co_await import_schema(impl->_dp, store, std::move(schema));
co_return protobuf_schema_definition{std::move(impl)};
co_return protobuf_schema_definition{std::move(impl), std::move(refs)};
}

ss::future<canonical_schema_definition>
Expand All @@ -362,12 +360,11 @@ make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema) {
canonical_schema temp{
std::move(schema).sub(),
{canonical_schema_definition::raw_string{schema.def().raw()()},
schema.def().type()},
std::move(schema).refs()};
schema.def().type(),
schema.def().refs()}};

auto validated = co_await validate_protobuf_schema(store, temp);
co_return canonical_schema{
std::move(temp).sub(), std::move(validated), std::move(temp).refs()};
co_return canonical_schema{std::move(temp).sub(), std::move(validated)};
// NOLINTEND(bugprone-use-after-move)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ inline void rjson_serialize(
w.Key("schemaType");
::json::rjson_serialize(w, to_string_view(type));
}
if (!res.schema.refs().empty()) {
if (!res.schema.def().refs().empty()) {
w.Key("references");
w.StartArray();
for (const auto& ref : res.schema.refs()) {
for (const auto& ref : res.schema.def().refs()) {
w.StartObject();
w.Key("name");
::json::rjson_serialize(w, ref.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class post_subject_versions_request_handler
subject sub{invalid_subject};
unparsed_schema_definition::raw_string def;
schema_type type{schema_type::avro};
unparsed_schema::references refs;
unparsed_schema_definition::references refs;
};
mutable_schema _schema;

Expand Down Expand Up @@ -207,8 +207,7 @@ class post_subject_versions_request_handler
_state = state::empty;
result.def = {
std::move(_schema.sub),
{std::move(_schema.def), _schema.type},
std::move(_schema.refs)};
{std::move(_schema.def), _schema.type, std::move(_schema.refs)}};
return true;
}
case state::reference: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,14 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_version_response) {
R"({\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"type\":\"string\",\"name\":\"field1\"},{\"type\":\"com.acme.Referenced\",\"name\":\"int\"}]})"};
const pps::canonical_schema_definition schema_def{
R"({"type":"record","name":"test","fields":[{"type":"string","name":"field1"},{"type":"com.acme.Referenced","name":"int"}]})",
pps::schema_type::avro};
pps::schema_type::avro,
{{{"com.acme.Referenced"},
pps::subject{"childSubject"},
pps::schema_version{1}}}};
const pps::subject sub{"imported-ref"};

pps::post_subject_versions_version_response response{
.schema{
pps::subject{"imported-ref"},
schema_def,
{{{"com.acme.Referenced"},
pps::subject{"childSubject"},
pps::schema_version{1}}}},
.id{12},
.version{2}};
.schema{pps::subject{"imported-ref"}, schema_def}, .id{12}, .version{2}};

const ss::sstring expected{
R"(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) {
R"({\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"type\":\"string\",\"name\":\"field1\"},{\"type\":\"com.acme.Referenced\",\"name\":\"int\"}]})"};
const pps::unparsed_schema_definition expected_schema_def{
R"({"type":"record","name":"test","fields":[{"type":"string","name":"field1"},{"type":"com.acme.Referenced","name":"int"}]})",
pps::schema_type::avro};
pps::schema_type::avro,
{{.name{"com.acme.Referenced"},
.sub{pps::subject{"childSubject"}},
.version{pps::schema_version{1}}}}};

const ss::sstring payload{
R"(
Expand All @@ -47,13 +50,7 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) {
})"};
const pps::subject sub{"test_subject"};
const parse_result expected{
{sub,
expected_schema_def,
{{.name{"com.acme.Referenced"},
.sub{pps::subject{"childSubject"}},
.version{pps::schema_version{1}}}}},
std::nullopt,
std::nullopt};
{sub, expected_schema_def}, std::nullopt, std::nullopt};

auto result{ppj::rjson_parse(
payload.data(), pps::post_subject_versions_request_handler{sub})};
Expand All @@ -63,8 +60,9 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) {
result.def = {
std::move(result.def).sub(),
pps::unparsed_schema_definition{
ppj::minify(result.def.def().raw()()), pps::schema_type::avro},
std::move(result.def).refs()};
ppj::minify(result.def.def().raw()()),
pps::schema_type::avro,
std::move(result.def).def().refs()}};
// NOLINTEND(bugprone-use-after-move)

BOOST_REQUIRE_EQUAL(expected.def, result.def);
Expand Down
89 changes: 56 additions & 33 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ sharded_store::make_canonical_schema(unparsed_schema schema) {
case schema_type::avro: {
co_return canonical_schema{
std::move(schema.sub()),
sanitize_avro_schema_definition(schema.def()).value(),
std::move(schema.refs())};
sanitize_avro_schema_definition(schema.def()).value()};
}
case schema_type::protobuf:
co_return co_await make_canonical_protobuf_schema(
Expand Down Expand Up @@ -220,8 +219,6 @@ ss::future<bool> sharded_store::upsert(
marker,
// NOLINTNEXTLINE(bugprone-use-after-move)
std::move(schema).sub(),
// NOLINTNEXTLINE(bugprone-use-after-move)
std::move(schema).refs(),
version,
id,
deleted);
Expand Down Expand Up @@ -301,7 +298,7 @@ ss::future<subject_schema> sharded_store::get_subject_schema(
});

co_return subject_schema{
.schema = {sub, std::move(def), std::move(v_id.refs)},
.schema = {sub, std::move(def)},
.version = v_id.version,
.id = v_id.id,
.deleted = v_id.deleted};
Expand Down Expand Up @@ -330,10 +327,27 @@ sharded_store::get_versions(subject sub, include_deleted inc_del) {
}

ss::future<bool> sharded_store::is_referenced(subject sub, schema_version ver) {
auto map = [sub{std::move(sub)}, ver](store& s) {
return s.is_referenced(sub, ver);
};
co_return co_await _store.map_reduce0(map, false, std::logical_or<>{});
// Find all the schema that reference this sub-ver
auto references = co_await _store.map_reduce0(
[sub{std::move(sub)}, ver](store& s) {
return s.referenced_by(sub, ver);
},
std::vector<schema_id>{},
[](std::vector<schema_id> acc, std::vector<schema_id> refs) {
acc.insert(acc.end(), refs.begin(), refs.end());
return acc;
});
std::sort(references.begin(), references.end());
auto end = std::unique(references.begin(), references.end());
references.resize(std::distance(references.begin(), end));

// Find whether any subject version reference any of the schema
co_return co_await _store.map_reduce0(
[refs{std::move(references)}](store& s) {
return s.subject_versions_has_any_of(refs);
},
false,
std::logical_or<>{});
}

ss::future<std::vector<schema_id>> sharded_store::referenced_by(
Expand All @@ -348,16 +362,34 @@ ss::future<std::vector<schema_id>> sharded_store::referenced_by(
ver = versions.back();
}

auto map = [sub{std::move(sub)}, ver](store& s) {
return s.referenced_by(sub, ver);
};
auto reduce = [](std::vector<schema_id> acc, std::vector<schema_id> refs) {
acc.insert(acc.end(), refs.begin(), refs.end());
return acc;
};
// Find all the schema that reference this sub-ver
auto references = co_await _store.map_reduce0(
map, std::vector<schema_id>{}, reduce);
std::sort(references.begin(), references.end());
[sub{std::move(sub)}, ver](store& s) {
return s.referenced_by(sub, ver);
},
std::vector<schema_id>{},
[](std::vector<schema_id> acc, std::vector<schema_id> refs) {
acc.insert(acc.end(), refs.begin(), refs.end());
return acc;
});
absl::c_sort(references);
auto end = std::unique(references.begin(), references.end());
references.resize(std::distance(references.begin(), end));

// Find all the subject versions that reference any of the schema
references = co_await _store.map_reduce0(
[refs{std::move(references)}](store& s) {
return s.subject_versions_with_any_of(refs);
},
std::vector<schema_id>{},
[](std::vector<schema_id> acc, std::vector<schema_id> refs) {
acc.insert(acc.end(), refs.begin(), refs.end());
return acc;
});

absl::c_sort(references);
end = std::unique(references.begin(), references.end());
references.resize(std::distance(references.begin(), end));
co_return references;
}

Expand Down Expand Up @@ -464,37 +496,28 @@ sharded_store::upsert_schema(schema_id id, canonical_schema_definition def) {
});
}

ss::future<sharded_store::insert_subject_result> sharded_store::insert_subject(
subject sub, canonical_schema::references refs, schema_id id) {
ss::future<sharded_store::insert_subject_result>
sharded_store::insert_subject(subject sub, schema_id id) {
auto sub_shard{shard_for(sub)};
auto [version, inserted] = co_await _store.invoke_on(
sub_shard,
_smp_opts,
[sub{std::move(sub)}, refs{std::move(refs)}, id](store& s) mutable {
return s.insert_subject(sub, std::move(refs), id);
sub_shard, _smp_opts, [sub{std::move(sub)}, id](store& s) mutable {
return s.insert_subject(sub, id);
});
co_return insert_subject_result{version, inserted};
}

ss::future<bool> sharded_store::upsert_subject(
seq_marker marker,
subject sub,
canonical_schema::references refs,
schema_version version,
schema_id id,
is_deleted deleted) {
auto sub_shard{shard_for(sub)};
co_return co_await _store.invoke_on(
sub_shard,
_smp_opts,
[marker,
sub{std::move(sub)},
refs{std::move(refs)},
version,
id,
deleted](store& s) mutable {
return s.upsert_subject(
marker, std::move(sub), std::move(refs), version, id, deleted);
[marker, sub{std::move(sub)}, version, id, deleted](store& s) mutable {
return s.upsert_subject(marker, std::move(sub), version, id, deleted);
});
}

Expand Down
4 changes: 1 addition & 3 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,11 @@ class sharded_store {
schema_version version;
bool inserted;
};
ss::future<insert_subject_result> insert_subject(
subject sub, canonical_schema::references refs, schema_id id);
ss::future<insert_subject_result> insert_subject(subject sub, schema_id id);

ss::future<bool> upsert_subject(
seq_marker marker,
subject sub,
canonical_schema::references refs,
schema_version version,
schema_id id,
is_deleted deleted);
Expand Down
9 changes: 4 additions & 5 deletions src/v/pandaproxy/schema_registry/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ inline void rjson_serialize(
w.Key("schemaType");
::json::rjson_serialize(w, to_string_view(type));
}
if (!val.schema.refs().empty()) {
if (!val.schema.def().refs().empty()) {
w.Key("references");
w.StartArray();
for (const auto& ref : val.schema.refs()) {
for (const auto& ref : val.schema.def().refs()) {
w.StartObject();
w.Key("name");
::json::rjson_serialize(w, ref.name);
Expand Down Expand Up @@ -381,7 +381,7 @@ class schema_value_handler final : public json::base_handler<Encoding> {
subject sub{invalid_subject};
typename typed_schema_definition<Tag>::raw_string def;
schema_type type{schema_type::avro};
typename typed_schema<Tag>::references refs;
typename typed_schema_definition<Tag>::references refs;
};
mutable_schema _schema;

Expand Down Expand Up @@ -572,8 +572,7 @@ class schema_value_handler final : public json::base_handler<Encoding> {
_state = state::empty;
result.schema = {
std::move(_schema.sub),
{std::move(_schema.def), _schema.type},
std::move(_schema.refs)};
{std::move(_schema.def), _schema.type, std::move(_schema.refs)}};
return true;
}
case state::reference: {
Expand Down
Loading

0 comments on commit 8789740

Please sign in to comment.