diff --git a/src/v/cluster/tx_snapshot_adl_utils.h b/src/v/cluster/tx_snapshot_adl_utils.h new file mode 100644 index 000000000000..da216426e538 --- /dev/null +++ b/src/v/cluster/tx_snapshot_adl_utils.h @@ -0,0 +1,285 @@ +// Copyright 2020 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "cluster/rm_stm.h" +#include "reflection/async_adl.h" + +namespace reflection { + +template +using fvec = fragmented_vector; + +using tx_snapshot = cluster::rm_stm::tx_snapshot; +using tx_snapshot_v0 = cluster::rm_stm::tx_snapshot_v0; +using tx_snapshot_v1 = cluster::rm_stm::tx_snapshot_v1; +using tx_snapshot_v2 = cluster::rm_stm::tx_snapshot_v2; +using tx_snapshot_v3 = cluster::rm_stm::tx_snapshot_v3; + +using tx_range = cluster::rm_stm::tx_range; +using prepare_marker = cluster::rm_stm::prepare_marker; +using abort_index = cluster::rm_stm::abort_index; +using seq_entry = cluster::rm_stm::seq_entry; +using seq_entry_v0 = cluster::rm_stm::seq_entry_v0; +using seq_entry_v1 = cluster::rm_stm::seq_entry_v1; + +template<> +struct async_adl { + ss::future<> to(iobuf& out, tx_snapshot snap) { + co_await detail::async_adl_list< + fragmented_vector>{} + .to(out, std::move(snap.fenced)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.ongoing)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.prepared)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.aborted)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.abort_indexes)); + reflection::serialize(out, snap.offset); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.seqs)); + co_await detail::async_adl_list>{} + .to(out, std::move(snap.tx_data)); + co_await detail::async_adl_list< + fvec>{} + .to(out, std::move(snap.expiration)); + } + + ss::future from(iobuf_parser& in) { + auto fenced + = co_await detail::async_adl_list>{} + .from(in); + auto ongoing = co_await detail::async_adl_list>{}.from( + in); + auto prepared + = co_await detail::async_adl_list>{}.from(in); + auto aborted = co_await detail::async_adl_list>{}.from( + in); + auto abort_indexes + = co_await detail::async_adl_list>{}.from(in); + auto offset = reflection::adl{}.from(in); + auto seqs = co_await detail::async_adl_list>{}.from(in); + auto tx_data = co_await detail::async_adl_list< + fvec>{} + .from(in); + auto expiration = co_await detail::async_adl_list< + fvec>{} + .from(in); + + co_return tx_snapshot{ + .fenced = std::move(fenced), + .ongoing = std::move(ongoing), + .prepared = std::move(prepared), + .aborted = std::move(aborted), + .abort_indexes = std::move(abort_indexes), + .offset = offset, + .seqs = std::move(seqs), + .tx_data = std::move(tx_data), + .expiration = std::move(expiration)}; + } +}; + +template<> +struct async_adl { + ss::future<> to(iobuf& out, tx_snapshot_v0 snap) { + co_await detail::async_adl_list< + fragmented_vector>{} + .to(out, std::move(snap.fenced)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.ongoing)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.prepared)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.aborted)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.abort_indexes)); + reflection::serialize(out, snap.offset); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.seqs)); + } + + ss::future from(iobuf_parser& in) { + auto fenced + = co_await detail::async_adl_list>{} + .from(in); + auto ongoing = co_await detail::async_adl_list>{}.from( + in); + auto prepared + = co_await detail::async_adl_list>{}.from(in); + auto aborted = co_await detail::async_adl_list>{}.from( + in); + auto abort_indexes + = co_await detail::async_adl_list>{}.from(in); + auto offset = reflection::adl{}.from(in); + auto seqs = co_await detail::async_adl_list>{}.from( + in); + + co_return tx_snapshot_v0{ + .fenced = std::move(fenced), + .ongoing = std::move(ongoing), + .prepared = std::move(prepared), + .aborted = std::move(aborted), + .abort_indexes = std::move(abort_indexes), + .offset = offset, + .seqs = std::move(seqs)}; + } +}; + +template<> +struct async_adl { + ss::future<> to(iobuf& out, tx_snapshot_v1 snap) { + co_await detail::async_adl_list< + fragmented_vector>{} + .to(out, std::move(snap.fenced)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.ongoing)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.prepared)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.aborted)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.abort_indexes)); + reflection::serialize(out, snap.offset); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.seqs)); + } + + ss::future from(iobuf_parser& in) { + auto fenced + = co_await detail::async_adl_list>{} + .from(in); + auto ongoing = co_await detail::async_adl_list>{}.from( + in); + auto prepared + = co_await detail::async_adl_list>{}.from(in); + auto aborted = co_await detail::async_adl_list>{}.from( + in); + auto abort_indexes + = co_await detail::async_adl_list>{}.from(in); + auto offset = reflection::adl{}.from(in); + auto seqs = co_await detail::async_adl_list>{}.from( + in); + + co_return tx_snapshot_v1{ + .fenced = std::move(fenced), + .ongoing = std::move(ongoing), + .prepared = std::move(prepared), + .aborted = std::move(aborted), + .abort_indexes = std::move(abort_indexes), + .offset = offset, + .seqs = std::move(seqs)}; + } +}; + +template<> +struct async_adl { + ss::future<> to(iobuf& out, tx_snapshot_v2 snap) { + co_await detail::async_adl_list< + fragmented_vector>{} + .to(out, std::move(snap.fenced)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.ongoing)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.prepared)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.aborted)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.abort_indexes)); + reflection::serialize(out, snap.offset); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.seqs)); + } + + ss::future from(iobuf_parser& in) { + auto fenced + = co_await detail::async_adl_list>{} + .from(in); + auto ongoing = co_await detail::async_adl_list>{}.from( + in); + auto prepared + = co_await detail::async_adl_list>{}.from(in); + auto aborted = co_await detail::async_adl_list>{}.from( + in); + auto abort_indexes + = co_await detail::async_adl_list>{}.from(in); + auto offset = reflection::adl{}.from(in); + auto seqs = co_await detail::async_adl_list>{}.from(in); + + co_return tx_snapshot_v2{ + .fenced = std::move(fenced), + .ongoing = std::move(ongoing), + .prepared = std::move(prepared), + .aborted = std::move(aborted), + .abort_indexes = std::move(abort_indexes), + .offset = offset, + .seqs = std::move(seqs)}; + } +}; + +template<> +struct async_adl { + ss::future<> to(iobuf& out, tx_snapshot_v3 snap) { + co_await detail::async_adl_list< + fragmented_vector>{} + .to(out, std::move(snap.fenced)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.ongoing)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.prepared)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.aborted)); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.abort_indexes)); + reflection::serialize(out, snap.offset); + co_await detail::async_adl_list>{}.to( + out, std::move(snap.seqs)); + co_await detail::async_adl_list< + fvec>{} + .to(out, std::move(snap.tx_seqs)); + co_await detail::async_adl_list< + fvec>{} + .to(out, std::move(snap.expiration)); + } + + ss::future from(iobuf_parser& in) { + auto fenced + = co_await detail::async_adl_list>{} + .from(in); + auto ongoing = co_await detail::async_adl_list>{}.from( + in); + auto prepared + = co_await detail::async_adl_list>{}.from(in); + auto aborted = co_await detail::async_adl_list>{}.from( + in); + auto abort_indexes + = co_await detail::async_adl_list>{}.from(in); + auto offset = reflection::adl{}.from(in); + auto seqs = co_await detail::async_adl_list>{}.from(in); + auto tx_seqs = co_await detail::async_adl_list< + fvec>{} + .from(in); + auto expiration = co_await detail::async_adl_list< + fvec>{} + .from(in); + + co_return tx_snapshot_v3{ + .fenced = std::move(fenced), + .ongoing = std::move(ongoing), + .prepared = std::move(prepared), + .aborted = std::move(aborted), + .abort_indexes = std::move(abort_indexes), + .offset = offset, + .seqs = std::move(seqs), + .tx_seqs = std::move(tx_seqs), + .expiration = std::move(expiration)}; + } +}; + +}; // namespace reflection