diff --git a/src/v/pandaproxy/schema_registry/avro.cc b/src/v/pandaproxy/schema_registry/avro.cc index 86658b44c54c..1c700d713b0c 100644 --- a/src/v/pandaproxy/schema_registry/avro.cc +++ b/src/v/pandaproxy/schema_registry/avro.cc @@ -17,6 +17,7 @@ #include "json/document.h" #include "json/json.h" #include "json/types.h" +#include "pandaproxy/schema_registry/compatibility.h" #include "pandaproxy/schema_registry/error.h" #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/sharded_store.h" @@ -550,9 +551,13 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) { def.refs()}; } -bool check_compatible( - const avro_schema_definition& reader, const avro_schema_definition& writer) { - return check_compatible(*reader().root(), *writer().root()); +compatibility_result check_compatible( + const avro_schema_definition& reader, + const avro_schema_definition& writer, + verbose is_verbose [[maybe_unused]]) { + // TODO(gellert.nagy): start using the is_verbose flag in a follow up PR + return compatibility_result{ + .is_compat = check_compatible(*reader().root(), *writer().root())}; } } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/avro.h b/src/v/pandaproxy/schema_registry/avro.h index 80c38d1a47a9..9d13a67c490e 100644 --- a/src/v/pandaproxy/schema_registry/avro.h +++ b/src/v/pandaproxy/schema_registry/avro.h @@ -23,7 +23,9 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema); result sanitize_avro_schema_definition(unparsed_schema_definition def); -bool check_compatible( - const avro_schema_definition& reader, const avro_schema_definition& writer); +compatibility_result check_compatible( + const avro_schema_definition& reader, + const avro_schema_definition& writer, + verbose is_verbose = verbose::no); } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/json.cc b/src/v/pandaproxy/schema_registry/json.cc index 51311176248e..c72fbf812adb 100644 --- a/src/v/pandaproxy/schema_registry/json.cc +++ b/src/v/pandaproxy/schema_registry/json.cc @@ -1570,15 +1570,22 @@ ss::future make_canonical_json_schema( co_return schema; } -bool check_compatible( - const json_schema_definition& reader, const json_schema_definition& writer) { - // schemas might be using incompatible dialects - if (!check_compatible_dialects(reader().doc, writer().doc)) { - return false; - } - // reader is a superset of writer iff every schema that is valid for writer - // is also valid for reader - return is_superset(reader().doc, writer().doc); +compatibility_result check_compatible( + const json_schema_definition& reader, + const json_schema_definition& writer, + verbose is_verbose [[maybe_unused]]) { + auto is_compatible = [&]() { + // schemas might be using incompatible dialects + if (!check_compatible_dialects(reader().doc, writer().doc)) { + return false; + } + // reader is a superset of writer iff every schema that is valid for + // writer is also valid for reader + return is_superset(reader().doc, writer().doc); + }(); + + // TODO(gellert.nagy): start using the is_verbose flag in a follow up PR + return compatibility_result{.is_compat = is_compatible}; } } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/json.h b/src/v/pandaproxy/schema_registry/json.h index fa96a7ea9020..b288425209cf 100644 --- a/src/v/pandaproxy/schema_registry/json.h +++ b/src/v/pandaproxy/schema_registry/json.h @@ -22,7 +22,9 @@ make_json_schema_definition(sharded_store& store, canonical_schema schema); ss::future make_canonical_json_schema( sharded_store& store, unparsed_schema def, normalize norm = normalize::no); -bool check_compatible( - const json_schema_definition& reader, const json_schema_definition& writer); +compatibility_result check_compatible( + const json_schema_definition& reader, + const json_schema_definition& writer, + verbose is_verbose = verbose::no); } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/protobuf.cc b/src/v/pandaproxy/schema_registry/protobuf.cc index d07a3c44f83f..d982b1af721f 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.cc +++ b/src/v/pandaproxy/schema_registry/protobuf.cc @@ -689,11 +689,13 @@ struct compatibility_checker { } // namespace -bool check_compatible( +compatibility_result check_compatible( const protobuf_schema_definition& reader, - const protobuf_schema_definition& writer) { + const protobuf_schema_definition& writer, + verbose is_verbose [[maybe_unused]]) { compatibility_checker checker{reader(), writer()}; - return checker.check_compatible(); + // TODO(gellert.nagy): start using the is_verbose flag in a follow up PR + return compatibility_result{.is_compat = checker.check_compatible()}; } } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/protobuf.h b/src/v/pandaproxy/schema_registry/protobuf.h index edbc6aaf65c3..58c325064a47 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.h +++ b/src/v/pandaproxy/schema_registry/protobuf.h @@ -25,8 +25,9 @@ validate_protobuf_schema(sharded_store& store, canonical_schema schema); ss::future make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema); -bool check_compatible( +compatibility_result check_compatible( const protobuf_schema_definition& reader, - const protobuf_schema_definition& writer); + const protobuf_schema_definition& writer, + verbose is_verbose = verbose::no); } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 4d24aafa29f5..1acaae781afb 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -24,6 +24,7 @@ #include "pandaproxy/schema_registry/protobuf.h" #include "pandaproxy/schema_registry/store.h" #include "pandaproxy/schema_registry/types.h" +#include "pandaproxy/schema_registry/util.h" #include #include @@ -49,13 +50,14 @@ ss::shard_id shard_for(schema_id id) { return jump_consistent_hash(id(), ss::smp::count); } -bool check_compatible(const valid_schema& reader, const valid_schema& writer) { - return reader.visit([&](const auto& reader) { - return writer.visit([&](const auto& writer) { +compatibility_result check_compatible( + const valid_schema& reader, const valid_schema& writer, verbose is_verbose) { + return reader.visit([&](const auto& reader) -> compatibility_result { + return writer.visit([&](const auto& writer) -> compatibility_result { if constexpr (std::is_same_v) { - return check_compatible(reader, writer); + return check_compatible(reader, writer, is_verbose); } - return false; + return {.is_compat = false}; }); }); } @@ -682,6 +684,18 @@ ss::future<> sharded_store::maybe_update_max_schema_id(schema_id id) { ss::future sharded_store::is_compatible( schema_version version, canonical_schema new_schema) { + auto rslt = co_await do_is_compatible( + version, std::move(new_schema), verbose::no); + co_return rslt.is_compat; +} + +ss::future sharded_store::is_compatible( + schema_version version, canonical_schema new_schema, verbose is_verbose) { + return do_is_compatible(version, std::move(new_schema), is_verbose); +} + +ss::future sharded_store::do_is_compatible( + schema_version version, canonical_schema new_schema, verbose is_verbose) { // Lookup the version_ids const auto sub = new_schema.sub(); const auto versions = co_await _store.invoke_on( @@ -707,16 +721,22 @@ ss::future sharded_store::is_compatible( auto old_schema = co_await get_subject_schema( sub, version, include_deleted::no); + // Lookup the compatibility level + auto compat = co_await get_compatibility(sub, default_to_global::yes); + // Types must always match if (old_schema.schema.type() != new_schema.type()) { - co_return false; + compatibility_result result{.is_compat = false}; + if (is_verbose) { + result.messages = { + "Incompatible because of different schema type", + fmt::format("{{compatibility: {}}}", compat)}; + } + co_return result; } - // Lookup the compatibility level - auto compat = co_await get_compatibility(sub, default_to_global::yes); - if (compat == compatibility_level::none) { - co_return true; + co_return compatibility_result{.is_compat = true}; } // Currently support JSON, PROTOBUF, AVRO @@ -742,8 +762,18 @@ ss::future sharded_store::is_compatible( auto new_valid = co_await make_valid_schema(std::move(new_schema)); - auto is_compat = true; - for (; is_compat && ver_it != versions.end(); ++ver_it) { + compatibility_result result{.is_compat = true}; + + auto formatter = [](std::string_view rdr, std::string_view wrtr) { + return [rdr, wrtr](std::string_view msg) { + return fmt::format( + fmt::runtime(msg), + fmt::arg("reader", rdr), + fmt::arg("writer", wrtr)); + }; + }; + + for (; result.is_compat && ver_it != versions.end(); ++ver_it) { if (ver_it->deleted) { continue; } @@ -753,22 +783,56 @@ ss::future sharded_store::is_compatible( auto old_valid = co_await make_valid_schema( std::move(old_schema.schema)); + std::vector version_messages; + if ( compat == compatibility_level::backward || compat == compatibility_level::backward_transitive || compat == compatibility_level::full || compat == compatibility_level::full_transitive) { - is_compat = is_compat && check_compatible(new_valid, old_valid); + auto r = check_compatible(new_valid, old_valid, is_verbose); + result.is_compat = result.is_compat && r.is_compat; + version_messages.reserve( + version_messages.size() + r.messages.size()); + std::transform( + std::make_move_iterator(r.messages.begin()), + std::make_move_iterator(r.messages.end()), + std::back_inserter(version_messages), + formatter("new", "old")); } if ( compat == compatibility_level::forward || compat == compatibility_level::forward_transitive || compat == compatibility_level::full || compat == compatibility_level::full_transitive) { - is_compat = is_compat && check_compatible(old_valid, new_valid); + auto r = check_compatible(old_valid, new_valid, is_verbose); + result.is_compat = result.is_compat && r.is_compat; + version_messages.reserve( + version_messages.size() + r.messages.size()); + std::transform( + std::make_move_iterator(r.messages.begin()), + std::make_move_iterator(r.messages.end()), + std::back_inserter(version_messages), + formatter("old", "new")); + } + + if (is_verbose && !result.is_compat) { + version_messages.emplace_back( + fmt::format("{{oldSchemaVersion: {}}}", old_schema.version)); + version_messages.emplace_back( + fmt::format("{{oldSchema: '{}'}}", to_string(old_valid.raw()))); + version_messages.emplace_back( + fmt::format("{{compatibility: {}}}", compat)); } + + result.messages.reserve( + result.messages.size() + version_messages.size()); + std::move( + version_messages.begin(), + version_messages.end(), + std::back_inserter(result.messages)); } - co_return is_compat; + co_return result; } void sharded_store::check_mode_mutability(force f) const { diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index bf9fc976f517..4ce9e5611d9a 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -178,12 +178,24 @@ class sharded_store { ss::future is_compatible(schema_version version, canonical_schema new_schema); + ///\brief Check if the provided schema is compatible with the subject and + /// version, according the the current compatibility, with the result + /// optionally accompanied by a vector of detailed error messages. + /// + /// If the compatibility level is transitive, then all versions are checked, + /// otherwise checks are against the version provided and newer. + ss::future is_compatible( + schema_version version, canonical_schema new_schema, verbose is_verbose); + ss::future has_version(const subject&, schema_id, include_deleted); //// \brief Throw if the store is not mutable void check_mode_mutability(force f) const; private: + ss::future do_is_compatible( + schema_version version, canonical_schema new_schema, verbose is_verbose); + ss::future upsert_schema(schema_id id, canonical_schema_definition def); ss::future<> delete_schema(schema_id id); diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc b/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc index 78e2dc4e6ee6..596143c3f38c 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_avro.cc @@ -23,12 +23,13 @@ bool check_compatible( const pps::canonical_schema_definition& w) { pps::sharded_store s; return check_compatible( - pps::make_avro_schema_definition( - s, {pps::subject("r"), {r.shared_raw(), pps::schema_type::avro}}) - .get(), - pps::make_avro_schema_definition( - s, {pps::subject("w"), {w.shared_raw(), pps::schema_type::avro}}) - .get()); + pps::make_avro_schema_definition( + s, {pps::subject("r"), {r.shared_raw(), pps::schema_type::avro}}) + .get(), + pps::make_avro_schema_definition( + s, {pps::subject("w"), {w.shared_raw(), pps::schema_type::avro}}) + .get()) + .is_compat; } SEASTAR_THREAD_TEST_CASE(test_avro_type_promotion) { diff --git a/src/v/pandaproxy/schema_registry/test/test_json_schema.cc b/src/v/pandaproxy/schema_registry/test/test_json_schema.cc index 15b615866664..de8985a1d6cc 100644 --- a/src/v/pandaproxy/schema_registry/test/test_json_schema.cc +++ b/src/v/pandaproxy/schema_registry/test/test_json_schema.cc @@ -24,6 +24,12 @@ namespace pp = pandaproxy; namespace pps = pp::schema_registry; +bool check_compatible( + const pps::json_schema_definition& reader_schema, + const pps::json_schema_definition& writer_schema) { + return pps::check_compatible(reader_schema, writer_schema).is_compat; +} + struct store_fixture { store_fixture() { store.start(pps::is_mutable::yes, ss::default_smp_service_group()) @@ -979,14 +985,14 @@ SEASTAR_THREAD_TEST_CASE(test_compatibility_check) { try { // sanity check that each schema is compatible with itself BOOST_CHECK_MESSAGE( - pps::check_compatible( + ::check_compatible( make_json_schema(data.reader_schema), make_json_schema(data.reader_schema)), fmt::format( "reader '{}' should be compatible with itself", data.reader_schema)); BOOST_CHECK_MESSAGE( - pps::check_compatible( + ::check_compatible( make_json_schema(data.writer_schema), make_json_schema(data.writer_schema)), fmt::format( @@ -996,7 +1002,7 @@ SEASTAR_THREAD_TEST_CASE(test_compatibility_check) { // check compatibility (or not) reader->writer BOOST_CHECK_EQUAL( data.reader_is_compatible_with_writer, - pps::check_compatible( + ::check_compatible( make_json_schema(data.reader_schema), make_json_schema(data.writer_schema))); } catch (...) {