Skip to content

Commit

Permalink
sr/sharded_store: Wire up compatibility_result
Browse files Browse the repository at this point in the history
Signed-off-by: Oren Leiman <oren.leiman@redpanda.com>
  • Loading branch information
oleiman authored and pgellert committed Aug 19, 2024
1 parent b27954c commit 25fcd5e
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 45 deletions.
11 changes: 8 additions & 3 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "json/document.h"
#include "json/json.h"
#include "json/types.h"
#include "pandaproxy/schema_registry/compatibility.h"
#include "pandaproxy/schema_registry/error.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/sharded_store.h"
Expand Down Expand Up @@ -550,9 +551,13 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) {
def.refs()};
}

bool check_compatible(
const avro_schema_definition& reader, const avro_schema_definition& writer) {
return check_compatible(*reader().root(), *writer().root());
compatibility_result check_compatible(
const avro_schema_definition& reader,
const avro_schema_definition& writer,
verbose is_verbose [[maybe_unused]]) {
// TODO(gellert.nagy): start using the is_verbose flag in a follow up PR
return compatibility_result{
.is_compat = check_compatible(*reader().root(), *writer().root())};
}

} // namespace pandaproxy::schema_registry
6 changes: 4 additions & 2 deletions src/v/pandaproxy/schema_registry/avro.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema);
result<canonical_schema_definition>
sanitize_avro_schema_definition(unparsed_schema_definition def);

bool check_compatible(
const avro_schema_definition& reader, const avro_schema_definition& writer);
compatibility_result check_compatible(
const avro_schema_definition& reader,
const avro_schema_definition& writer,
verbose is_verbose = verbose::no);

} // namespace pandaproxy::schema_registry
25 changes: 16 additions & 9 deletions src/v/pandaproxy/schema_registry/json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1570,15 +1570,22 @@ ss::future<canonical_schema> make_canonical_json_schema(
co_return schema;
}

bool check_compatible(
const json_schema_definition& reader, const json_schema_definition& writer) {
// schemas might be using incompatible dialects
if (!check_compatible_dialects(reader().doc, writer().doc)) {
return false;
}
// reader is a superset of writer iff every schema that is valid for writer
// is also valid for reader
return is_superset(reader().doc, writer().doc);
compatibility_result check_compatible(
const json_schema_definition& reader,
const json_schema_definition& writer,
verbose is_verbose [[maybe_unused]]) {
auto is_compatible = [&]() {
// schemas might be using incompatible dialects
if (!check_compatible_dialects(reader().doc, writer().doc)) {
return false;
}
// reader is a superset of writer iff every schema that is valid for
// writer is also valid for reader
return is_superset(reader().doc, writer().doc);
}();

// TODO(gellert.nagy): start using the is_verbose flag in a follow up PR
return compatibility_result{.is_compat = is_compatible};
}

} // namespace pandaproxy::schema_registry
6 changes: 4 additions & 2 deletions src/v/pandaproxy/schema_registry/json.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ make_json_schema_definition(sharded_store& store, canonical_schema schema);
ss::future<canonical_schema> make_canonical_json_schema(
sharded_store& store, unparsed_schema def, normalize norm = normalize::no);

bool check_compatible(
const json_schema_definition& reader, const json_schema_definition& writer);
compatibility_result check_compatible(
const json_schema_definition& reader,
const json_schema_definition& writer,
verbose is_verbose = verbose::no);

} // namespace pandaproxy::schema_registry
8 changes: 5 additions & 3 deletions src/v/pandaproxy/schema_registry/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -689,11 +689,13 @@ struct compatibility_checker {

} // namespace

bool check_compatible(
compatibility_result check_compatible(
const protobuf_schema_definition& reader,
const protobuf_schema_definition& writer) {
const protobuf_schema_definition& writer,
verbose is_verbose [[maybe_unused]]) {
compatibility_checker checker{reader(), writer()};
return checker.check_compatible();
// TODO(gellert.nagy): start using the is_verbose flag in a follow up PR
return compatibility_result{.is_compat = checker.check_compatible()};
}

} // namespace pandaproxy::schema_registry
Expand Down
5 changes: 3 additions & 2 deletions src/v/pandaproxy/schema_registry/protobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ validate_protobuf_schema(sharded_store& store, canonical_schema schema);
ss::future<canonical_schema>
make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema);

bool check_compatible(
compatibility_result check_compatible(
const protobuf_schema_definition& reader,
const protobuf_schema_definition& writer);
const protobuf_schema_definition& writer,
verbose is_verbose = verbose::no);

} // namespace pandaproxy::schema_registry
94 changes: 79 additions & 15 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "pandaproxy/schema_registry/protobuf.h"
#include "pandaproxy/schema_registry/store.h"
#include "pandaproxy/schema_registry/types.h"
#include "pandaproxy/schema_registry/util.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
Expand All @@ -49,13 +50,14 @@ ss::shard_id shard_for(schema_id id) {
return jump_consistent_hash(id(), ss::smp::count);
}

bool check_compatible(const valid_schema& reader, const valid_schema& writer) {
return reader.visit([&](const auto& reader) {
return writer.visit([&](const auto& writer) {
compatibility_result check_compatible(
const valid_schema& reader, const valid_schema& writer, verbose is_verbose) {
return reader.visit([&](const auto& reader) -> compatibility_result {
return writer.visit([&](const auto& writer) -> compatibility_result {
if constexpr (std::is_same_v<decltype(reader), decltype(writer)>) {
return check_compatible(reader, writer);
return check_compatible(reader, writer, is_verbose);
}
return false;
return {.is_compat = false};
});
});
}
Expand Down Expand Up @@ -682,6 +684,18 @@ ss::future<> sharded_store::maybe_update_max_schema_id(schema_id id) {

ss::future<bool> sharded_store::is_compatible(
schema_version version, canonical_schema new_schema) {
auto rslt = co_await do_is_compatible(
version, std::move(new_schema), verbose::no);
co_return rslt.is_compat;
}

ss::future<compatibility_result> sharded_store::is_compatible(
schema_version version, canonical_schema new_schema, verbose is_verbose) {
return do_is_compatible(version, std::move(new_schema), is_verbose);
}

ss::future<compatibility_result> sharded_store::do_is_compatible(
schema_version version, canonical_schema new_schema, verbose is_verbose) {
// Lookup the version_ids
const auto sub = new_schema.sub();
const auto versions = co_await _store.invoke_on(
Expand All @@ -707,16 +721,22 @@ ss::future<bool> sharded_store::is_compatible(
auto old_schema = co_await get_subject_schema(
sub, version, include_deleted::no);

// Lookup the compatibility level
auto compat = co_await get_compatibility(sub, default_to_global::yes);

// Types must always match
if (old_schema.schema.type() != new_schema.type()) {
co_return false;
compatibility_result result{.is_compat = false};
if (is_verbose) {
result.messages = {
"Incompatible because of different schema type",
fmt::format("{{compatibility: {}}}", compat)};
}
co_return result;
}

// Lookup the compatibility level
auto compat = co_await get_compatibility(sub, default_to_global::yes);

if (compat == compatibility_level::none) {
co_return true;
co_return compatibility_result{.is_compat = true};
}

// Currently support JSON, PROTOBUF, AVRO
Expand All @@ -742,8 +762,18 @@ ss::future<bool> sharded_store::is_compatible(

auto new_valid = co_await make_valid_schema(std::move(new_schema));

auto is_compat = true;
for (; is_compat && ver_it != versions.end(); ++ver_it) {
compatibility_result result{.is_compat = true};

auto formatter = [](std::string_view rdr, std::string_view wrtr) {
return [rdr, wrtr](std::string_view msg) {
return fmt::format(
fmt::runtime(msg),
fmt::arg("reader", rdr),
fmt::arg("writer", wrtr));
};
};

for (; result.is_compat && ver_it != versions.end(); ++ver_it) {
if (ver_it->deleted) {
continue;
}
Expand All @@ -753,22 +783,56 @@ ss::future<bool> sharded_store::is_compatible(
auto old_valid = co_await make_valid_schema(
std::move(old_schema.schema));

std::vector<ss::sstring> version_messages;

if (
compat == compatibility_level::backward
|| compat == compatibility_level::backward_transitive
|| compat == compatibility_level::full
|| compat == compatibility_level::full_transitive) {
is_compat = is_compat && check_compatible(new_valid, old_valid);
auto r = check_compatible(new_valid, old_valid, is_verbose);
result.is_compat = result.is_compat && r.is_compat;
version_messages.reserve(
version_messages.size() + r.messages.size());
std::transform(
std::make_move_iterator(r.messages.begin()),
std::make_move_iterator(r.messages.end()),
std::back_inserter(version_messages),
formatter("new", "old"));
}
if (
compat == compatibility_level::forward
|| compat == compatibility_level::forward_transitive
|| compat == compatibility_level::full
|| compat == compatibility_level::full_transitive) {
is_compat = is_compat && check_compatible(old_valid, new_valid);
auto r = check_compatible(old_valid, new_valid, is_verbose);
result.is_compat = result.is_compat && r.is_compat;
version_messages.reserve(
version_messages.size() + r.messages.size());
std::transform(
std::make_move_iterator(r.messages.begin()),
std::make_move_iterator(r.messages.end()),
std::back_inserter(version_messages),
formatter("old", "new"));
}

if (is_verbose && !result.is_compat) {
version_messages.emplace_back(
fmt::format("{{oldSchemaVersion: {}}}", old_schema.version));
version_messages.emplace_back(
fmt::format("{{oldSchema: '{}'}}", to_string(old_valid.raw())));
version_messages.emplace_back(
fmt::format("{{compatibility: {}}}", compat));
}

result.messages.reserve(
result.messages.size() + version_messages.size());
std::move(
version_messages.begin(),
version_messages.end(),
std::back_inserter(result.messages));
}
co_return is_compat;
co_return result;
}

void sharded_store::check_mode_mutability(force f) const {
Expand Down
12 changes: 12 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,24 @@ class sharded_store {
ss::future<bool>
is_compatible(schema_version version, canonical_schema new_schema);

///\brief Check if the provided schema is compatible with the subject and
/// version, according the the current compatibility, with the result
/// optionally accompanied by a vector of detailed error messages.
///
/// If the compatibility level is transitive, then all versions are checked,
/// otherwise checks are against the version provided and newer.
ss::future<compatibility_result> is_compatible(
schema_version version, canonical_schema new_schema, verbose is_verbose);

ss::future<bool> has_version(const subject&, schema_id, include_deleted);

//// \brief Throw if the store is not mutable
void check_mode_mutability(force f) const;

private:
ss::future<compatibility_result> do_is_compatible(
schema_version version, canonical_schema new_schema, verbose is_verbose);

ss::future<bool>
upsert_schema(schema_id id, canonical_schema_definition def);
ss::future<> delete_schema(schema_id id);
Expand Down
13 changes: 7 additions & 6 deletions src/v/pandaproxy/schema_registry/test/compatibility_avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ bool check_compatible(
const pps::canonical_schema_definition& w) {
pps::sharded_store s;
return check_compatible(
pps::make_avro_schema_definition(
s, {pps::subject("r"), {r.shared_raw(), pps::schema_type::avro}})
.get(),
pps::make_avro_schema_definition(
s, {pps::subject("w"), {w.shared_raw(), pps::schema_type::avro}})
.get());
pps::make_avro_schema_definition(
s, {pps::subject("r"), {r.shared_raw(), pps::schema_type::avro}})
.get(),
pps::make_avro_schema_definition(
s, {pps::subject("w"), {w.shared_raw(), pps::schema_type::avro}})
.get())
.is_compat;
}

SEASTAR_THREAD_TEST_CASE(test_avro_type_promotion) {
Expand Down
12 changes: 9 additions & 3 deletions src/v/pandaproxy/schema_registry/test/test_json_schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
namespace pp = pandaproxy;
namespace pps = pp::schema_registry;

bool check_compatible(
const pps::json_schema_definition& reader_schema,
const pps::json_schema_definition& writer_schema) {
return pps::check_compatible(reader_schema, writer_schema).is_compat;
}

struct store_fixture {
store_fixture() {
store.start(pps::is_mutable::yes, ss::default_smp_service_group())
Expand Down Expand Up @@ -979,14 +985,14 @@ SEASTAR_THREAD_TEST_CASE(test_compatibility_check) {
try {
// sanity check that each schema is compatible with itself
BOOST_CHECK_MESSAGE(
pps::check_compatible(
::check_compatible(
make_json_schema(data.reader_schema),
make_json_schema(data.reader_schema)),
fmt::format(
"reader '{}' should be compatible with itself",
data.reader_schema));
BOOST_CHECK_MESSAGE(
pps::check_compatible(
::check_compatible(
make_json_schema(data.writer_schema),
make_json_schema(data.writer_schema)),
fmt::format(
Expand All @@ -996,7 +1002,7 @@ SEASTAR_THREAD_TEST_CASE(test_compatibility_check) {
// check compatibility (or not) reader->writer
BOOST_CHECK_EQUAL(
data.reader_is_compatible_with_writer,
pps::check_compatible(
::check_compatible(
make_json_schema(data.reader_schema),
make_json_schema(data.writer_schema)));
} catch (...) {
Expand Down

0 comments on commit 25fcd5e

Please sign in to comment.