diff --git a/src/v/pandaproxy/schema_registry/avro.cc b/src/v/pandaproxy/schema_registry/avro.cc index 6c2ded7101b14..33ebbc9584e2f 100644 --- a/src/v/pandaproxy/schema_registry/avro.cc +++ b/src/v/pandaproxy/schema_registry/avro.cc @@ -367,8 +367,10 @@ result sanitize(json::Value::Array& a, json::MemoryPoolAllocator& alloc) { } // namespace -avro_schema_definition::avro_schema_definition(avro::ValidSchema vs) - : _impl(std::move(vs)) {} +avro_schema_definition::avro_schema_definition( + avro::ValidSchema vs, canonical_schema_definition::references refs) + : _impl(std::move(vs)) + , _refs(std::move(refs)) {} const avro::ValidSchema& avro_schema_definition::operator()() const { return _impl; @@ -422,7 +424,7 @@ ss::future collect_schema( collected_schema collected, ss::sstring name, canonical_schema schema) { - for (auto& ref : std::move(schema).refs()) { + for (auto& ref : schema.def().refs()) { if (!collected.contains(ref.name)) { auto ss = co_await store.get_subject_schema( std::move(ref.sub), ref.version, include_deleted::no); @@ -445,8 +447,10 @@ make_avro_schema_definition(sharded_store& store, canonical_schema schema) { auto name = schema.sub()(); auto refs = co_await collect_schema(store, {}, name, std::move(schema)); auto def = refs.flatten(); - co_return avro_schema_definition{avro::compileJsonSchemaFromMemory( - reinterpret_cast(def.data()), def.length())}; + co_return avro_schema_definition{ + avro::compileJsonSchemaFromMemory( + reinterpret_cast(def.data()), def.length()), + schema.def().refs()}; } catch (const avro::Exception& e) { ex = e; } @@ -493,7 +497,8 @@ sanitize_avro_schema_definition(unparsed_schema_definition def) { return canonical_schema_definition{ std::string_view{str_buf.GetString(), str_buf.GetSize()}, - schema_type::avro}; + schema_type::avro, + def.refs()}; } bool check_compatible( diff --git a/src/v/pandaproxy/schema_registry/protobuf.cc b/src/v/pandaproxy/schema_registry/protobuf.cc index 32f02399743ba..1f2bd0683f83b 100644 --- a/src/v/pandaproxy/schema_registry/protobuf.cc +++ b/src/v/pandaproxy/schema_registry/protobuf.cc @@ -273,7 +273,7 @@ build_file(pb::DescriptorPool& dp, const pb::FileDescriptorProto& fdp) { /// on stack unwind. ss::future build_file_with_refs( pb::DescriptorPool& dp, sharded_store& store, canonical_schema schema) { - for (const auto& ref : schema.refs()) { + for (const auto& ref : schema.def().refs()) { if (dp.FindFileByName(ref.name)) { continue; } @@ -282,10 +282,7 @@ ss::future build_file_with_refs( co_await build_file_with_refs( dp, store, - canonical_schema{ - subject{ref.name}, - std::move(dep.schema).def(), - std::move(dep.schema).refs()}); + canonical_schema{subject{ref.name}, std::move(dep.schema).def()}); } parser p; @@ -345,8 +342,9 @@ operator<<(std::ostream& os, const protobuf_schema_definition& def) { ss::future make_protobuf_schema_definition(sharded_store& store, canonical_schema schema) { auto impl = ss::make_shared(); + auto refs = schema.def().refs(); impl->fd = co_await import_schema(impl->_dp, store, std::move(schema)); - co_return protobuf_schema_definition{std::move(impl)}; + co_return protobuf_schema_definition{std::move(impl), std::move(refs)}; } ss::future @@ -362,12 +360,11 @@ make_canonical_protobuf_schema(sharded_store& store, unparsed_schema schema) { canonical_schema temp{ std::move(schema).sub(), {canonical_schema_definition::raw_string{schema.def().raw()()}, - schema.def().type()}, - std::move(schema).refs()}; + schema.def().type(), + schema.def().refs()}}; auto validated = co_await validate_protobuf_schema(store, temp); - co_return canonical_schema{ - std::move(temp).sub(), std::move(validated), std::move(temp).refs()}; + co_return canonical_schema{std::move(temp).sub(), std::move(validated)}; // NOLINTEND(bugprone-use-after-move) } diff --git a/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h b/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h index 18aa3e44266a3..23da030df1766 100644 --- a/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h +++ b/src/v/pandaproxy/schema_registry/requests/get_subject_versions_version.h @@ -37,10 +37,10 @@ inline void rjson_serialize( w.Key("schemaType"); ::json::rjson_serialize(w, to_string_view(type)); } - if (!res.schema.refs().empty()) { + if (!res.schema.def().refs().empty()) { w.Key("references"); w.StartArray(); - for (const auto& ref : res.schema.refs()) { + for (const auto& ref : res.schema.def().refs()) { w.StartObject(); w.Key("name"); ::json::rjson_serialize(w, ref.name); diff --git a/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h b/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h index 1e83190fcc595..ff99f0419aa7c 100644 --- a/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h +++ b/src/v/pandaproxy/schema_registry/requests/post_subject_versions.h @@ -49,7 +49,7 @@ class post_subject_versions_request_handler subject sub{invalid_subject}; unparsed_schema_definition::raw_string def; schema_type type{schema_type::avro}; - unparsed_schema::references refs; + unparsed_schema_definition::references refs; }; mutable_schema _schema; @@ -207,8 +207,7 @@ class post_subject_versions_request_handler _state = state::empty; result.def = { std::move(_schema.sub), - {std::move(_schema.def), _schema.type}, - std::move(_schema.refs)}; + {std::move(_schema.def), _schema.type, std::move(_schema.refs)}}; return true; } case state::reference: { diff --git a/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc b/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc index 996b7b71aac13..c944adb22cc4c 100644 --- a/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc +++ b/src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc @@ -23,18 +23,14 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_version_response) { R"({\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"type\":\"string\",\"name\":\"field1\"},{\"type\":\"com.acme.Referenced\",\"name\":\"int\"}]})"}; const pps::canonical_schema_definition schema_def{ R"({"type":"record","name":"test","fields":[{"type":"string","name":"field1"},{"type":"com.acme.Referenced","name":"int"}]})", - pps::schema_type::avro}; + pps::schema_type::avro, + {{{"com.acme.Referenced"}, + pps::subject{"childSubject"}, + pps::schema_version{1}}}}; const pps::subject sub{"imported-ref"}; pps::post_subject_versions_version_response response{ - .schema{ - pps::subject{"imported-ref"}, - schema_def, - {{{"com.acme.Referenced"}, - pps::subject{"childSubject"}, - pps::schema_version{1}}}}, - .id{12}, - .version{2}}; + .schema{pps::subject{"imported-ref"}, schema_def}, .id{12}, .version{2}}; const ss::sstring expected{ R"( diff --git a/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc b/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc index 21ba6198da026..676e983a4ef1b 100644 --- a/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc +++ b/src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc @@ -29,7 +29,10 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) { R"({\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"type\":\"string\",\"name\":\"field1\"},{\"type\":\"com.acme.Referenced\",\"name\":\"int\"}]})"}; const pps::unparsed_schema_definition expected_schema_def{ R"({"type":"record","name":"test","fields":[{"type":"string","name":"field1"},{"type":"com.acme.Referenced","name":"int"}]})", - pps::schema_type::avro}; + pps::schema_type::avro, + {{.name{"com.acme.Referenced"}, + .sub{pps::subject{"childSubject"}}, + .version{pps::schema_version{1}}}}}; const ss::sstring payload{ R"( @@ -47,13 +50,7 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) { })"}; const pps::subject sub{"test_subject"}; const parse_result expected{ - {sub, - expected_schema_def, - {{.name{"com.acme.Referenced"}, - .sub{pps::subject{"childSubject"}}, - .version{pps::schema_version{1}}}}}, - std::nullopt, - std::nullopt}; + {sub, expected_schema_def}, std::nullopt, std::nullopt}; auto result{ppj::rjson_parse( payload.data(), pps::post_subject_versions_request_handler{sub})}; @@ -63,8 +60,9 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) { result.def = { std::move(result.def).sub(), pps::unparsed_schema_definition{ - ppj::minify(result.def.def().raw()()), pps::schema_type::avro}, - std::move(result.def).refs()}; + ppj::minify(result.def.def().raw()()), + pps::schema_type::avro, + std::move(result.def).def().refs()}}; // NOLINTEND(bugprone-use-after-move) BOOST_REQUIRE_EQUAL(expected.def, result.def); diff --git a/src/v/pandaproxy/schema_registry/sharded_store.cc b/src/v/pandaproxy/schema_registry/sharded_store.cc index 67dd4c29c462e..cdb886eb13329 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/sharded_store.cc @@ -72,8 +72,7 @@ sharded_store::make_canonical_schema(unparsed_schema schema) { case schema_type::avro: { co_return canonical_schema{ std::move(schema.sub()), - sanitize_avro_schema_definition(schema.def()).value(), - std::move(schema.refs())}; + sanitize_avro_schema_definition(schema.def()).value()}; } case schema_type::protobuf: co_return co_await make_canonical_protobuf_schema( @@ -220,8 +219,6 @@ ss::future sharded_store::upsert( marker, // NOLINTNEXTLINE(bugprone-use-after-move) std::move(schema).sub(), - // NOLINTNEXTLINE(bugprone-use-after-move) - std::move(schema).refs(), version, id, deleted); @@ -301,7 +298,7 @@ ss::future sharded_store::get_subject_schema( }); co_return subject_schema{ - .schema = {sub, std::move(def), std::move(v_id.refs)}, + .schema = {sub, std::move(def)}, .version = v_id.version, .id = v_id.id, .deleted = v_id.deleted}; @@ -330,10 +327,27 @@ sharded_store::get_versions(subject sub, include_deleted inc_del) { } ss::future sharded_store::is_referenced(subject sub, schema_version ver) { - auto map = [sub{std::move(sub)}, ver](store& s) { - return s.is_referenced(sub, ver); - }; - co_return co_await _store.map_reduce0(map, false, std::logical_or<>{}); + // Find all the schema that reference this sub-ver + auto references = co_await _store.map_reduce0( + [sub{std::move(sub)}, ver](store& s) { + return s.referenced_by(sub, ver); + }, + std::vector{}, + [](std::vector acc, std::vector refs) { + acc.insert(acc.end(), refs.begin(), refs.end()); + return acc; + }); + std::sort(references.begin(), references.end()); + auto end = std::unique(references.begin(), references.end()); + references.resize(std::distance(references.begin(), end)); + + // Find whether any subject version reference any of the schema + co_return co_await _store.map_reduce0( + [refs{std::move(references)}](store& s) { + return s.subject_versions_has_any_of(refs); + }, + false, + std::logical_or<>{}); } ss::future> sharded_store::referenced_by( @@ -348,16 +362,34 @@ ss::future> sharded_store::referenced_by( ver = versions.back(); } - auto map = [sub{std::move(sub)}, ver](store& s) { - return s.referenced_by(sub, ver); - }; - auto reduce = [](std::vector acc, std::vector refs) { - acc.insert(acc.end(), refs.begin(), refs.end()); - return acc; - }; + // Find all the schema that reference this sub-ver auto references = co_await _store.map_reduce0( - map, std::vector{}, reduce); - std::sort(references.begin(), references.end()); + [sub{std::move(sub)}, ver](store& s) { + return s.referenced_by(sub, ver); + }, + std::vector{}, + [](std::vector acc, std::vector refs) { + acc.insert(acc.end(), refs.begin(), refs.end()); + return acc; + }); + absl::c_sort(references); + auto end = std::unique(references.begin(), references.end()); + references.resize(std::distance(references.begin(), end)); + + // Find all the subject versions that reference any of the schema + references = co_await _store.map_reduce0( + [refs{std::move(references)}](store& s) { + return s.subject_versions_with_any_of(refs); + }, + std::vector{}, + [](std::vector acc, std::vector refs) { + acc.insert(acc.end(), refs.begin(), refs.end()); + return acc; + }); + + absl::c_sort(references); + end = std::unique(references.begin(), references.end()); + references.resize(std::distance(references.begin(), end)); co_return references; } @@ -464,14 +496,12 @@ sharded_store::upsert_schema(schema_id id, canonical_schema_definition def) { }); } -ss::future sharded_store::insert_subject( - subject sub, canonical_schema::references refs, schema_id id) { +ss::future +sharded_store::insert_subject(subject sub, schema_id id) { auto sub_shard{shard_for(sub)}; auto [version, inserted] = co_await _store.invoke_on( - sub_shard, - _smp_opts, - [sub{std::move(sub)}, refs{std::move(refs)}, id](store& s) mutable { - return s.insert_subject(sub, std::move(refs), id); + sub_shard, _smp_opts, [sub{std::move(sub)}, id](store& s) mutable { + return s.insert_subject(sub, id); }); co_return insert_subject_result{version, inserted}; } @@ -479,7 +509,6 @@ ss::future sharded_store::insert_subject( ss::future sharded_store::upsert_subject( seq_marker marker, subject sub, - canonical_schema::references refs, schema_version version, schema_id id, is_deleted deleted) { @@ -487,14 +516,8 @@ ss::future sharded_store::upsert_subject( co_return co_await _store.invoke_on( sub_shard, _smp_opts, - [marker, - sub{std::move(sub)}, - refs{std::move(refs)}, - version, - id, - deleted](store& s) mutable { - return s.upsert_subject( - marker, std::move(sub), std::move(refs), version, id, deleted); + [marker, sub{std::move(sub)}, version, id, deleted](store& s) mutable { + return s.upsert_subject(marker, std::move(sub), version, id, deleted); }); } diff --git a/src/v/pandaproxy/schema_registry/sharded_store.h b/src/v/pandaproxy/schema_registry/sharded_store.h index 39c1f46a57677..99f8cb0099a41 100644 --- a/src/v/pandaproxy/schema_registry/sharded_store.h +++ b/src/v/pandaproxy/schema_registry/sharded_store.h @@ -142,13 +142,11 @@ class sharded_store { schema_version version; bool inserted; }; - ss::future insert_subject( - subject sub, canonical_schema::references refs, schema_id id); + ss::future insert_subject(subject sub, schema_id id); ss::future upsert_subject( seq_marker marker, subject sub, - canonical_schema::references refs, schema_version version, schema_id id, is_deleted deleted); diff --git a/src/v/pandaproxy/schema_registry/storage.h b/src/v/pandaproxy/schema_registry/storage.h index c8915899493c8..38aa1121930db 100644 --- a/src/v/pandaproxy/schema_registry/storage.h +++ b/src/v/pandaproxy/schema_registry/storage.h @@ -336,10 +336,10 @@ inline void rjson_serialize( w.Key("schemaType"); ::json::rjson_serialize(w, to_string_view(type)); } - if (!val.schema.refs().empty()) { + if (!val.schema.def().refs().empty()) { w.Key("references"); w.StartArray(); - for (const auto& ref : val.schema.refs()) { + for (const auto& ref : val.schema.def().refs()) { w.StartObject(); w.Key("name"); ::json::rjson_serialize(w, ref.name); @@ -381,7 +381,7 @@ class schema_value_handler final : public json::base_handler { subject sub{invalid_subject}; typename typed_schema_definition::raw_string def; schema_type type{schema_type::avro}; - typename typed_schema::references refs; + typename typed_schema_definition::references refs; }; mutable_schema _schema; @@ -572,8 +572,7 @@ class schema_value_handler final : public json::base_handler { _state = state::empty; result.schema = { std::move(_schema.sub), - {std::move(_schema.def), _schema.type}, - std::move(_schema.refs)}; + {std::move(_schema.def), _schema.type, std::move(_schema.refs)}}; return true; } case state::reference: { diff --git a/src/v/pandaproxy/schema_registry/store.h b/src/v/pandaproxy/schema_registry/store.h index 910f4c1dad91e..f57a411b9edd8 100644 --- a/src/v/pandaproxy/schema_registry/store.h +++ b/src/v/pandaproxy/schema_registry/store.h @@ -16,26 +16,24 @@ #include "pandaproxy/schema_registry/errors.h" #include "pandaproxy/schema_registry/types.h" +#include #include #include +#include + namespace pandaproxy::schema_registry { ///\brief A mapping of version and schema id for a subject. struct subject_version_entry { subject_version_entry( - schema_version version, - schema_id id, - canonical_schema::references refs, - is_deleted deleted) + schema_version version, schema_id id, is_deleted deleted) : version{version} , id{id} - , refs{std::move(refs)} , deleted(deleted) {} schema_version version; schema_id id; - canonical_schema::references refs; is_deleted deleted{is_deleted::no}; std::vector written_at; @@ -74,8 +72,7 @@ class store { /// return the schema_version and schema_id, and whether it's new. insert_result insert(canonical_schema schema) { auto id = insert_schema(std::move(schema).def()).id; - auto [version, inserted] = insert_subject( - std::move(schema).sub(), std::move(schema).refs(), id); + auto [version, inserted] = insert_subject(std::move(schema).sub(), id); return {version, id, inserted}; } @@ -149,7 +146,7 @@ class store { auto def = BOOST_OUTCOME_TRYX(get_schema_definition(v_id.id)); return subject_schema{ - .schema = {sub, std::move(def), std::move(v_id.refs)}, + .schema = {sub, std::move(def)}, .version = v_id.version, .id = v_id.id, .deleted = v_id.deleted}; @@ -308,38 +305,40 @@ class store { }); } - bool is_referenced(const subject& sub, schema_version ver) { - return std::any_of( - _subjects.begin(), _subjects.end(), [&sub, ver](const auto& s) { - return std::any_of( - s.second.versions.begin(), - s.second.versions.end(), - [&sub, ver](const auto& v) { - return std::any_of( - v.refs.begin(), - v.refs.end(), - [&sub, ver](const auto& ref) { - return ref.sub == sub && ref.version == ver; - }); - }); - }); - } - std::vector referenced_by(const subject& sub, schema_version ver) { std::vector references; - for (const auto& s : _subjects) { - for (const auto& v : s.second.versions) { - for (const auto& r : v.refs) { - if (r.sub == sub && r.version == ver) { - references.emplace_back(v.id); - } + for (const auto& s : _schemas) { + for (const auto& r : s.second.definition.refs()) { + if (r.sub == sub && r.version == ver) { + references.emplace_back(s.first); } } } return references; } + std::vector + subject_versions_with_any_of(const std::vector& ids) { + std::vector has_ids; + for (const auto& s : _subjects) { + for (const auto& r : s.second.versions) { + if (absl::c_binary_search(ids, r.id)) { + has_ids.push_back(r.id); + } + } + } + return has_ids; + } + + bool subject_versions_has_any_of(const std::vector& ids) { + return absl::c_any_of(_subjects, [&ids](const auto& s) { + return absl::c_any_of(s.second.versions, [&ids](const auto& v) { + return absl::c_binary_search(ids, v.id); + }); + }); + } + ///\brief Delete a subject. result> delete_subject( seq_marker marker, const subject& sub, permanent_delete permanent) { @@ -485,8 +484,7 @@ class store { schema_version version; bool inserted; }; - insert_subject_result insert_subject( - subject sub, canonical_schema::references refs, schema_id id) { + insert_subject_result insert_subject(subject sub, schema_id id) { auto& subject_entry = _subjects[std::move(sub)]; subject_entry.deleted = is_deleted::no; auto& versions = subject_entry.versions; @@ -501,14 +499,13 @@ class store { const auto version = versions.empty() ? schema_version{1} : versions.back().version + 1; - versions.emplace_back(version, id, std::move(refs), is_deleted::no); + versions.emplace_back(version, id, is_deleted::no); return {version, true}; } bool upsert_subject( seq_marker marker, subject sub, - canonical_schema::references refs, schema_version version, schema_id id, is_deleted deleted) { @@ -526,10 +523,9 @@ class store { const bool found = v_it != versions.end() && v_it->version == version; if (found) { - *v_it = subject_version_entry( - version, id, std::move(refs), deleted); + *v_it = subject_version_entry(version, id, deleted); } else { - versions.emplace(v_it, version, id, std::move(refs), deleted); + versions.emplace(v_it, version, id, deleted); } const auto all_deleted = is_deleted( diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc index 84c3ca6829264..caf110c897785 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc +++ b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc @@ -19,6 +19,8 @@ #include +#include + namespace pp = pandaproxy; namespace pps = pp::schema_registry; @@ -63,16 +65,14 @@ bool check_compatible( store.insert( pandaproxy::schema_registry::canonical_schema{ pps::subject{"sub"}, - pps::canonical_schema_definition{writer, pps::schema_type::protobuf}, - {}}, + pps::canonical_schema_definition{writer, pps::schema_type::protobuf}}, pps::schema_version{1}); return store.store .is_compatible( pps::schema_version{1}, pps::canonical_schema{ pps::subject{"sub"}, - pps::canonical_schema_definition{reader, pps::schema_type::protobuf}, - {}}) + pps::canonical_schema_definition{reader, pps::schema_type::protobuf}}) .get(); } @@ -116,7 +116,8 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_not_referenced) { simple_sharded_store store; auto schema1 = pps::canonical_schema{pps::subject{"simple"}, simple}; - auto schema2 = pps::canonical_schema{pps::subject{"imported"}, imported}; + auto schema2 = pps::canonical_schema{ + pps::subject{"imported"}, imported_no_ref}; store.insert(schema1, pps::schema_version{1}); @@ -135,13 +136,9 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_referenced) { auto schema1 = pps::canonical_schema{pps::subject{"simple.proto"}, simple}; auto schema2 = pps::canonical_schema{ - pps::subject{"imported.proto"}, - imported, - {{"simple", pps::subject{"simple.proto"}, pps::schema_version{1}}}}; + pps::subject{"imported.proto"}, imported}; auto schema3 = pps::canonical_schema{ - pps::subject{"imported-again.proto"}, - imported_again, - {{"imported", pps::subject{"imported.proto"}, pps::schema_version{1}}}}; + pps::subject{"imported-again.proto"}, imported_again}; store.insert(schema1, pps::schema_version{1}); store.insert(schema2, pps::schema_version{1}); @@ -160,14 +157,9 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_recursive_reference) { auto schema1 = pps::canonical_schema{pps::subject{"simple.proto"}, simple}; auto schema2 = pps::canonical_schema{ - pps::subject{"imported.proto"}, - imported, - {{"simple", pps::subject{"simple.proto"}, pps::schema_version{1}}}}; + pps::subject{"imported.proto"}, imported}; auto schema3 = pps::canonical_schema{ - pps::subject{"imported-twice.proto"}, - imported_twice, - {{"simple", pps::subject{"simple.proto"}, pps::schema_version{1}}, - {"imported", pps::subject{"imported.proto"}, pps::schema_version{1}}}}; + pps::subject{"imported-twice.proto"}, imported_twice}; store.insert(schema1, pps::schema_version{1}); store.insert(schema2, pps::schema_version{1}); diff --git a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.h b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.h index e3c158b68387c..8dc490e622bfd 100644 --- a/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.h +++ b/src/v/pandaproxy/schema_registry/test/compatibility_protobuf.h @@ -23,7 +23,7 @@ message Simple { })", pps::schema_type::protobuf}; -const auto imported = pps::canonical_schema_definition{ +const auto imported_no_ref = pps::canonical_schema_definition{ R"( syntax = "proto3"; @@ -34,6 +34,18 @@ message Test2 { })", pps::schema_type::protobuf}; +const auto imported = pps::canonical_schema_definition{ + R"( +syntax = "proto3"; + +import "simple"; + +message Test2 { + Simple id = 1; +})", + pps::schema_type::protobuf, + {{"simple", pps::subject{"simple.proto"}, pps::schema_version{1}}}}; + const auto imported_again = pps::canonical_schema_definition{ R"( syntax = "proto3"; @@ -43,7 +55,8 @@ import "imported"; message Test3 { Test2 id = 1; })", - pps::schema_type::protobuf}; + pps::schema_type::protobuf, + {{"imported", pps::subject{"imported.proto"}, pps::schema_version{1}}}}; const auto imported_twice = pps::canonical_schema_definition{ R"( @@ -55,7 +68,9 @@ import "imported"; message Test3 { Test2 id = 1; })", - pps::schema_type::protobuf}; + pps::schema_type::protobuf, + {{"simple", pps::subject{"simple.proto"}, pps::schema_version{1}}, + {"imported", pps::subject{"imported.proto"}, pps::schema_version{1}}}}; const auto nested = pps::canonical_schema_definition{ R"( diff --git a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc index 2e188bdd7c64d..7fb28f1303f64 100644 --- a/src/v/pandaproxy/schema_registry/test/consume_to_store.cc +++ b/src/v/pandaproxy/schema_registry/test/consume_to_store.cc @@ -44,7 +44,9 @@ constexpr pps::schema_id id1{1}; const pps::canonical_schema_definition string_def0{ pps::sanitize_avro_schema_definition( - {R"({"type":"string"})", pps::schema_type::avro}) + {R"({"type":"string"})", + pps::schema_type::avro, + {{.name{"ref"}, .sub{subject0}, .version{version0}}}}) .value()}; const pps::canonical_schema_definition int_def0{ pps::sanitize_avro_schema_definition( @@ -120,12 +122,9 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { .get(); BOOST_REQUIRE_EQUAL(s_res.schema.def(), string_def0); - pps::canonical_schema::references refs{ - {.name{"ref"}, .sub{subject0}, .version{version0}}}; auto good_schema_ref_1 = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version1, magic1}, - pps::canonical_schema_value{ - {subject0, string_def0, refs}, version1, id1}); + pps::canonical_schema_value{{subject0, string_def0}, version1, id1}); BOOST_REQUIRE_NO_THROW(c(good_schema_ref_1.copy()).get()); auto s_ref_res = s.get_subject_schema( @@ -135,10 +134,10 @@ SEASTAR_THREAD_TEST_CASE(test_consume_to_store) { BOOST_REQUIRE_EQUAL(s_ref_res.schema.sub(), subject0); BOOST_REQUIRE_EQUAL(s_ref_res.id, id1); BOOST_REQUIRE_EQUAL_COLLECTIONS( - s_ref_res.schema.refs().begin(), - s_ref_res.schema.refs().end(), - refs.begin(), - refs.end()); + s_ref_res.schema.def().refs().begin(), + s_ref_res.schema.def().refs().end(), + string_def0.refs().begin(), + string_def0.refs().end()); auto bad_schema_magic = pps::as_record_batch( pps::schema_key{sequence, node_id, subject0, version0, magic2}, diff --git a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc index c314345cce7d4..77a8094d07b6c 100644 --- a/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc +++ b/src/v/pandaproxy/schema_registry/test/post_subjects_subject_version.cc @@ -35,10 +35,10 @@ void rjson_serialize( w.Key("schemaType"); ::json::rjson_serialize(w, to_string_view(r.schema.type())); } - if (!r.schema.refs().empty()) { + if (!r.schema.def().refs().empty()) { w.Key("references"); w.StartArray(); - for (const auto& ref : r.schema.refs()) { + for (const auto& ref : r.schema.def().refs()) { w.StartObject(); w.Key("name"); ::json::rjson_serialize(w, ref.name); @@ -539,7 +539,7 @@ FIXTURE_TEST(schema_registry_post_avro_references, pandaproxy_test_fixture) { const auto employee_req = request{pps::canonical_schema{ pps::subject{"employee-value"}, - pps::canonical_schema_definition( + pps::canonical_schema_definition{ R"({ "namespace": "com.redpanda", "type": "record", @@ -559,10 +559,10 @@ FIXTURE_TEST(schema_registry_post_avro_references, pandaproxy_test_fixture) { } ] })", - pps::schema_type::avro), - {{"com.redpanda.company", - pps::subject{"company-value"}, - pps::schema_version{1}}}}}; + pps::schema_type::avro, + {{"com.redpanda.company", + pps::subject{"company-value"}, + pps::schema_version{1}}}}}}; info("Connecting client"); auto client = make_schema_reg_client(); diff --git a/src/v/pandaproxy/schema_registry/test/sharded_store.cc b/src/v/pandaproxy/schema_registry/test/sharded_store.cc index 69bb843b1d9b1..5ff505709145d 100644 --- a/src/v/pandaproxy/schema_registry/test/sharded_store.cc +++ b/src/v/pandaproxy/schema_registry/test/sharded_store.cc @@ -45,9 +45,7 @@ SEASTAR_THREAD_TEST_CASE(test_sharded_store_referenced_by) { // Insert referenced auto importing_schema = pps::canonical_schema{ - pps::subject{"imported.proto"}, - imported, - {{"simple", pps::subject{"simple.proto"}, ver1}}}; + pps::subject{"imported.proto"}, imported}; store .upsert( diff --git a/src/v/pandaproxy/schema_registry/test/storage.cc b/src/v/pandaproxy/schema_registry/test/storage.cc index 86218aefd1e5d..1bbe5391bf36f 100644 --- a/src/v/pandaproxy/schema_registry/test/storage.cc +++ b/src/v/pandaproxy/schema_registry/test/storage.cc @@ -53,8 +53,9 @@ const pps::canonical_schema_value avro_schema_value{ .schema{ pps::subject{"my-kafka-value"}, pps::canonical_schema_definition{ - R"({"type":"string"})", pps::schema_type::avro}, - {{{"name"}, pps::subject{"subject"}, pps::schema_version{1}}}}, + R"({"type":"string"})", + pps::schema_type::avro, + {{{"name"}, pps::subject{"subject"}, pps::schema_version{1}}}}}, .version{pps::schema_version{1}}, .id{pps::schema_id{1}}, .deleted = pps::is_deleted::yes}; diff --git a/src/v/pandaproxy/schema_registry/test/store.cc b/src/v/pandaproxy/schema_registry/test/store.cc index 8cfcb568bda47..39539ddbff803 100644 --- a/src/v/pandaproxy/schema_registry/test/store.cc +++ b/src/v/pandaproxy/schema_registry/test/store.cc @@ -78,7 +78,7 @@ bool upsert( pps::is_deleted deleted) { store.upsert_schema(id, std::move(def)); return store.upsert_subject( - pps::seq_marker{}, std::move(sub), {}, version, id, deleted); + pps::seq_marker{}, std::move(sub), version, id, deleted); } BOOST_AUTO_TEST_CASE(test_store_upsert_in_order) { @@ -236,7 +236,6 @@ BOOST_AUTO_TEST_CASE(test_store_get_schema_subject_versions) { s.upsert_subject( dummy_marker, subject0, - {}, pps::schema_version{1}, pps::schema_id{1}, pps::is_deleted::yes); @@ -599,7 +598,6 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject_version) { BOOST_REQUIRE_NO_THROW(s.upsert_subject( dummy_marker, subject0, - {}, pps::schema_version{1}, pps::schema_id{1}, pps::is_deleted::yes)); @@ -665,7 +663,6 @@ BOOST_AUTO_TEST_CASE(test_store_subject_version_latest) { BOOST_REQUIRE_NO_THROW(s.upsert_subject( dummy_marker, subject0, - {}, pps::schema_version{2}, pps::schema_id{1}, pps::is_deleted::yes)); @@ -700,7 +697,6 @@ BOOST_AUTO_TEST_CASE(test_store_delete_subject_after_delete_version) { s.upsert_subject( dummy_marker, subject0, - {}, pps::schema_version{1}, pps::schema_id{1}, pps::is_deleted::yes); diff --git a/src/v/pandaproxy/schema_registry/types.cc b/src/v/pandaproxy/schema_registry/types.cc index 69ecb8dbec49b..a53d52b2bf3d6 100644 --- a/src/v/pandaproxy/schema_registry/types.cc +++ b/src/v/pandaproxy/schema_registry/types.cc @@ -44,7 +44,11 @@ std::ostream& operator<<( std::ostream& os, const typed_schema_definition& def) { fmt::print( - os, "type: {}, definition: {}", to_string_view(def.type()), def.raw()); + os, + "type: {}, definition: {}, references: {}", + to_string_view(def.type()), + def.raw(), + def.refs()); return os; } @@ -52,7 +56,11 @@ std::ostream& operator<<( std::ostream& os, const typed_schema_definition& def) { fmt::print( - os, "type: {}, definition: {}", to_string_view(def.type()), def.raw()); + os, + "type: {}, definition: {}, references: {}", + to_string_view(def.type()), + def.raw(), + def.refs()); return os; } @@ -63,14 +71,12 @@ std::ostream& operator<<(std::ostream& os, const schema_reference& ref) { } std::ostream& operator<<(std::ostream& os, const unparsed_schema& ref) { - fmt::print( - os, "subject: {}, {}, references: {}", ref.sub(), ref.def(), ref.refs()); + fmt::print(os, "subject: {}, {}", ref.sub(), ref.def()); return os; } std::ostream& operator<<(std::ostream& os, const canonical_schema& ref) { - fmt::print( - os, "subject: {}, {}, references: {}", ref.sub(), ref.def(), ref.refs()); + fmt::print(os, "subject: {}, {}", ref.sub(), ref.def()); return os; } diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index 619d60021b4e7..f2f290a2b8174 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -93,11 +93,19 @@ class typed_schema_definition { public: using tag = Tag; using raw_string = named_type; + using references = std::vector; template typed_schema_definition(T&& def, schema_type type) : _def{ss::sstring{std::forward(def)}} - , _type{type} {} + , _type{type} + , _refs{} {} + + template + typed_schema_definition(T&& def, schema_type type, references refs) + : _def{ss::sstring{std::forward(def)}} + , _type{type} + , _refs{std::move(refs)} {} friend bool operator==( const typed_schema_definition& lhs, const typed_schema_definition& rhs) @@ -111,9 +119,13 @@ class typed_schema_definition { const raw_string& raw() const& { return _def; } raw_string raw() && { return std::move(_def); } + const references& refs() const& { return _refs; } + references refs() && { return std::move(_refs); } + private: raw_string _def; schema_type _type{schema_type::avro}; + references _refs; }; ///\brief An unvalidated definition of the schema and its type. @@ -135,9 +147,13 @@ static const unparsed_schema_definition invalid_schema_definition{ ///\brief The definition of an avro schema. class avro_schema_definition { public: - explicit avro_schema_definition(avro::ValidSchema vs); + explicit avro_schema_definition( + avro::ValidSchema vs, canonical_schema_definition::references refs); canonical_schema_definition::raw_string raw() const; + canonical_schema_definition::references const& refs() const { + return _refs; + }; const avro::ValidSchema& operator()() const; @@ -157,6 +173,7 @@ class avro_schema_definition { private: avro::ValidSchema _impl; + canonical_schema_definition::references _refs; }; class protobuf_schema_definition { @@ -164,10 +181,15 @@ class protobuf_schema_definition { struct impl; using pimpl = ss::shared_ptr; - explicit protobuf_schema_definition(pimpl p) - : _impl{std::move(p)} {} + explicit protobuf_schema_definition( + pimpl p, canonical_schema_definition::references refs) + : _impl{std::move(p)} + , _refs(std::move(refs)) {} canonical_schema_definition::raw_string raw() const; + canonical_schema_definition::references const& refs() const { + return _refs; + }; const impl& operator()() const { return *_impl; } @@ -181,7 +203,7 @@ class protobuf_schema_definition { constexpr schema_type type() const { return schema_type::protobuf; } explicit operator canonical_schema_definition() const { - return {raw(), type()}; + return {raw(), type(), refs()}; } ::result @@ -189,6 +211,7 @@ class protobuf_schema_definition { private: pimpl _impl; + canonical_schema_definition::references _refs; }; ///\brief A schema that has been validated. @@ -301,19 +324,12 @@ class typed_schema { using tag = Tag; using schema_definition = typed_schema_definition; - using references = std::vector; - typed_schema() = default; typed_schema(subject sub, schema_definition def) : _sub{std::move(sub)} , _def{std::move(def)} {} - typed_schema(subject sub, schema_definition def, references refs) - : _sub{std::move(sub)} - , _def{std::move(def)} - , _refs{std::move(refs)} {} - friend bool operator==(const typed_schema& lhs, const typed_schema& rhs) = default; @@ -327,13 +343,9 @@ class typed_schema { const schema_definition& def() const& { return _def; } schema_definition def() && { return std::move(_def); } - const references& refs() const& { return _refs; } - references refs() && { return std::move(_refs); } - private: subject _sub{invalid_subject}; schema_definition _def{"", schema_type::avro}; - references _refs; }; using unparsed_schema = typed_schema;