Skip to content

Commit

Permalink
schema_registry/types: Support extracting a name
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed May 30, 2023
1 parent 25d55a8 commit 5a76733
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/avro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ canonical_schema_definition::raw_string avro_schema_definition::raw() const {
return canonical_schema_definition::raw_string{_impl.toJson(false)};
}

ss::sstring avro_schema_definition::name() const {
return _impl.root()->name().fullname();
};

class collected_schema {
public:
bool contains(const ss::sstring& name) const {
Expand Down
17 changes: 17 additions & 0 deletions src/v/pandaproxy/schema_registry/protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "pandaproxy/schema_registry/protobuf.h"

#include "kafka/protocol/errors.h"
#include "pandaproxy/logger.h"
#include "pandaproxy/schema_registry/errors.h"
#include "pandaproxy/schema_registry/sharded_store.h"
Expand Down Expand Up @@ -312,6 +313,22 @@ protobuf_schema_definition::raw() const {
return canonical_schema_definition::raw_string{_impl->fd->DebugString()};
}

::result<ss::sstring, kafka::error_code>
protobuf_schema_definition::name(std::vector<int> const& fields) const {
if (fields.empty()) {
return kafka::error_code::invalid_record;
}
auto f = fields.begin();
auto d = _impl->fd->message_type(*f++);
while (fields.end() != f && d) {
d = d->nested_type(*f++);
}
if (!d) {
return kafka::error_code::invalid_record;
}
return d->full_name();
}

bool operator==(
const protobuf_schema_definition& lhs,
const protobuf_schema_definition& rhs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ SEASTAR_THREAD_TEST_CASE(test_avro_schema_definition) {
"schema2 is an avro_schema_definition");
pps::canonical_schema_definition avro_conversion{valid};
BOOST_CHECK_EQUAL(expected, avro_conversion);
BOOST_CHECK_EQUAL(valid.name(), "myrecord");
}

SEASTAR_THREAD_TEST_CASE(test_avro_schema_definition_custom_attributes) {
Expand Down
13 changes: 13 additions & 0 deletions src/v/pandaproxy/schema_registry/test/compatibility_protobuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ SEASTAR_THREAD_TEST_CASE(test_protobuf_simple) {
store.insert(schema1, pps::schema_version{1});
auto valid_simple
= pps::make_protobuf_schema_definition(store.store, schema1).get();
BOOST_REQUIRE_EQUAL(valid_simple.name({0}).value(), "Simple");
}

SEASTAR_THREAD_TEST_CASE(test_protobuf_nested) {
simple_sharded_store store;

auto schema1 = pps::canonical_schema{pps::subject{"nested"}, nested};
store.insert(schema1, pps::schema_version{1});
auto valid_nested
= pps::make_protobuf_schema_definition(store.store, schema1).get();
BOOST_REQUIRE_EQUAL(valid_nested.name({0}).value(), "A0");
BOOST_REQUIRE_EQUAL(valid_nested.name({1, 0, 2}).value(), "A1.B0.C2");
BOOST_REQUIRE_EQUAL(valid_nested.name({1, 0, 4}).value(), "A1.B0.C4");
}

SEASTAR_THREAD_TEST_CASE(test_protobuf_imported_failure) {
Expand Down
17 changes: 17 additions & 0 deletions src/v/pandaproxy/schema_registry/test/compatibility_protobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,20 @@ message Test3 {
Test2 id = 1;
})",
pps::schema_type::protobuf};

const auto nested = pps::canonical_schema_definition{
R"(
syntax = "proto3";
message A0 {}
message A1 {
message B0 {
message C0 {}
message C1 {}
message C2 {}
message C3 {}
message C4 {}
}
})",
pps::schema_type::protobuf};
7 changes: 7 additions & 0 deletions src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
#pragma once

#include "avro/ValidSchema.hh"
#include "kafka/protocol/errors.h"
#include "model/metadata.h"
#include "outcome.h"
#include "seastarx.h"
#include "utils/named_type.h"
#include "utils/string_switch.h"
Expand Down Expand Up @@ -131,6 +133,8 @@ class avro_schema_definition {
return {raw(), type()};
}

ss::sstring name() const;

private:
avro::ValidSchema _impl;
};
Expand Down Expand Up @@ -160,6 +164,9 @@ class protobuf_schema_definition {
return {raw(), type()};
}

::result<ss::sstring, kafka::error_code>
name(std::vector<int> const& fields) const;

private:
pimpl _impl;
};
Expand Down

0 comments on commit 5a76733

Please sign in to comment.