Skip to content

Commit

Permalink
rpc/netbuf: refactored as_scattered to be asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
ballard26 committed Dec 1, 2022
1 parent e938dcb commit af1a079
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 5 deletions.
4 changes: 2 additions & 2 deletions src/v/rpc/netbuf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ iobuf header_as_iobuf(const header& h) {
}
/// \brief used to send the bytes down the wire
/// we re-compute the header-checksum on every call
ss::scattered_message<char> netbuf::as_scattered() && {
ss::future<ss::scattered_message<char>> netbuf::as_scattered() && {
if (_hdr.correlation_id == 0 || _hdr.meta == 0) {
throw std::runtime_error(
"cannot compose scattered view with incomplete header. missing "
Expand All @@ -54,7 +54,7 @@ ss::scattered_message<char> netbuf::as_scattered() && {
_out.prepend(header_as_iobuf(_hdr));

// prepare for output
return iobuf_as_scattered(std::move(_out));
co_return iobuf_as_scattered(std::move(_out));
}

} // namespace rpc
2 changes: 1 addition & 1 deletion src/v/rpc/simple_protocol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ send_reply(ss::lw_shared_ptr<server_context_impl> ctx, netbuf buf) {
buf.set_compression(rpc::compression_type::zstd);
buf.set_correlation_id(ctx->get_header().correlation_id);

auto view = std::move(buf).as_scattered();
auto view = co_await std::move(buf).as_scattered();
if (ctx->res.conn_gate().is_closed()) {
// do not write if gate is closed
rpclog.debug(
Expand Down
2 changes: 1 addition & 1 deletion src/v/rpc/test/netbuf_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ SEASTAR_THREAD_TEST_CASE(netbuf_pod) {
n.set_service_method_id(66);
reflection::async_adl<pod>{}.to(n.buffer(), std::move(src)).get();
// forces the computation of the header
auto bufs = std::move(n).as_scattered().release().release();
auto bufs = std::move(n).as_scattered().get().release().release();
auto in = make_iobuf_input_stream(iobuf(std::move(bufs)));
const pod dst = rpc::parse_framed<pod>(in).get0();
BOOST_REQUIRE_EQUAL(src.x, dst.x);
Expand Down
2 changes: 1 addition & 1 deletion src/v/rpc/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ class netbuf {
public:
/// \brief used to send the bytes down the wire
/// we re-compute the header-checksum on every call
ss::scattered_message<char> as_scattered() &&;
ss::future<ss::scattered_message<char>> as_scattered() &&;

void set_status(rpc::status);
void set_correlation_id(uint32_t);
Expand Down

0 comments on commit af1a079

Please sign in to comment.