Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] CORE-5766 Validate target node id when collecting health report #22910

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v/cluster/errc.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ enum class errc : int16_t {
topic_invalid_partitions_decreased,
producer_ids_vcluster_limit_exceeded,
validation_of_recovery_topic_failed,
invalid_target_node_id,
};

std::ostream& operator<<(std::ostream& o, errc err);
Expand Down Expand Up @@ -262,6 +263,8 @@ struct errc_category final : public std::error_category {
return "To many vclusters registered in producer state cache";
case errc::validation_of_recovery_topic_failed:
return "Validation of recovery topic failed";
case errc::invalid_target_node_id:
return "Request was intended for the node with different node id";
}
return "cluster::errc::unknown";
}
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/errors.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ std::ostream& operator<<(std::ostream& o, cluster::errc err) {
return o << "cluster::errc::producer_ids_vcluster_limit_exceeded";
case errc::validation_of_recovery_topic_failed:
return o << "cluster::errc::validation_of_recovery_topic_failed";
case errc::invalid_target_node_id:
return o << "cluster::errc::invalid_target_node_id";
}
}
} // namespace cluster
13 changes: 8 additions & 5 deletions src/v/cluster/health_monitor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,30 +366,33 @@ health_monitor_backend::collect_remote_node_health(model::node_id id) {
ss::this_shard_id(),
id,
max_metadata_age(),
[timeout](controller_client_protocol client) mutable {
[timeout, id](controller_client_protocol client) mutable {
return client.collect_node_health_report(
get_node_health_request{}, rpc::client_opts(timeout));
get_node_health_request(id), rpc::client_opts(timeout));
})
.then(&rpc::get_ctx_data<get_node_health_reply>)
.then([this, id](result<get_node_health_reply> reply) {
return process_node_reply(id, std::move(reply));
});
}

result<node_health_report>
map_reply_result(result<get_node_health_reply> reply) {
result<node_health_report> map_reply_result(
model::node_id target_node_id, result<get_node_health_reply> reply) {
if (!reply) {
return {reply.error()};
}
if (!reply.value().report.has_value()) {
return {reply.value().error};
}
if (reply.value().report->id != target_node_id) {
return {errc::invalid_target_node_id};
}
return {std::move(*reply.value().report)};
}

result<node_health_report> health_monitor_backend::process_node_reply(
model::node_id id, result<get_node_health_reply> reply) {
auto res = map_reply_result(std::move(reply));
auto res = map_reply_result(id, std::move(reply));
auto [status_it, _] = _status.try_emplace(id);
if (!res) {
vlog(
Expand Down
4 changes: 2 additions & 2 deletions src/v/cluster/health_monitor_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ std::ostream& operator<<(std::ostream& o, const partitions_filter& filter) {
return o;
}

std::ostream& operator<<(std::ostream& o, const get_node_health_request&) {
fmt::print(o, "{{}}");
std::ostream& operator<<(std::ostream& o, const get_node_health_request& r) {
fmt::print(o, "{{target_node_id: {}}}", r.get_target_node_id());
return o;
}

Expand Down
12 changes: 10 additions & 2 deletions src/v/cluster/health_monitor_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,13 @@ using force_refresh = ss::bool_class<struct hm_force_refresh_tag>;
class get_node_health_request
: public serde::envelope<
get_node_health_request,
serde::version<0>,
serde::version<1>,
serde::compat_version<0>> {
public:
using rpc_adl_exempt = std::true_type;
get_node_health_request() = default;
explicit get_node_health_request(model::node_id target_node_id)
: _target_node_id(target_node_id) {}

friend bool
operator==(const get_node_health_request&, const get_node_health_request&)
Expand All @@ -442,9 +445,14 @@ class get_node_health_request
friend std::ostream&
operator<<(std::ostream&, const get_node_health_request&);

auto serde_fields() { return std::tie(_filter); }
auto serde_fields() { return std::tie(_filter, _target_node_id); }
static constexpr model::node_id node_id_not_set{-1};

model::node_id get_target_node_id() const { return _target_node_id; }

private:
// default value for backward compatibility
model::node_id _target_node_id = node_id_not_set;
/**
* This field is no longer used, as it never was. It was made private on
* purpose
Expand Down
35 changes: 22 additions & 13 deletions src/v/cluster/node_status_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ss::future<result<node_status>> node_status_backend::send_node_status_request(
})
.then(&rpc::get_ctx_data<node_status_reply>);

co_return process_reply(reply);
co_return process_reply(target, reply);
}

ss::future<> node_status_backend::maybe_create_client(
Expand All @@ -245,26 +245,18 @@ ss::future<> node_status_backend::maybe_create_client(
target, address, _rpc_tls_config, create_backoff_policy());
}

result<node_status>
node_status_backend::process_reply(result<node_status_reply> reply) {
result<node_status> node_status_backend::process_reply(
model::node_id target_node_id, result<node_status_reply> reply) {
vassert(ss::this_shard_id() == shard, "invoked on a wrong shard");

if (!reply.has_error()) {
_stats.rpcs_sent += 1;
auto& replier_metadata = reply.value().replier_metadata;

return node_status{
.node_id = replier_metadata.node_id,
.last_seen = rpc::clock_type::now()};
} else {
static constexpr auto rate_limit = std::chrono::seconds(1);
if (reply.has_error()) {
auto err = reply.error();
if (
err.category() == rpc::error_category()
&& static_cast<rpc::errc>(err.value())
== rpc::errc::client_request_timeout) {
_stats.rpcs_timed_out += 1;
}
static constexpr auto rate_limit = std::chrono::seconds(1);
static ss::logger::rate_limit rate(rate_limit);
clusterlog.log(
ss::log_level::debug,
Expand All @@ -273,6 +265,23 @@ node_status_backend::process_reply(result<node_status_reply> reply) {
err.message());
return err;
}

_stats.rpcs_sent += 1;
auto& replier_metadata = reply.value().replier_metadata;
if (replier_metadata.node_id != target_node_id) {
static ss::logger::rate_limit rate(rate_limit);
clusterlog.log(
ss::log_level::debug,
rate,
"Received reply from node with different node id. Expected: {}, "
"current: {}",
target_node_id,
replier_metadata.node_id);
return errc::invalid_target_node_id;
}

return node_status{
.node_id = replier_metadata.node_id, .last_seen = rpc::clock_type::now()};
}

ss::future<node_status_reply>
Expand Down
3 changes: 2 additions & 1 deletion src/v/cluster/node_status_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class node_status_backend {
ss::future<> collect_and_store_updates();
ss::future<std::vector<node_status>> collect_updates_from_peers();

result<node_status> process_reply(result<node_status_reply>);
result<node_status> process_reply(
model::node_id target_node_id, result<node_status_reply> reply);
ss::future<node_status_reply> process_request(node_status_request);

ss::future<result<node_status>>
Expand Down
16 changes: 15 additions & 1 deletion src/v/cluster/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,21 @@ ss::future<get_cluster_health_reply> service::get_cluster_health_report(
}

ss::future<get_node_health_reply>
service::do_collect_node_health_report(get_node_health_request) {
service::do_collect_node_health_report(get_node_health_request req) {
// validate if the receiving node is the one that that the request is
// addressed to
if (
req.get_target_node_id() != get_node_health_request::node_id_not_set
&& req.get_target_node_id() != _controller->self()) {
vlog(
clusterlog.debug,
"Received a get_node_health request addressed to different node. "
"Requested node id: {}, current node id: {}",
req.get_target_node_id(),
_controller->self());
co_return get_node_health_reply{.error = errc::invalid_target_node_id};
}

auto res = co_await _hm_frontend.local().get_current_node_health();
if (res.has_error()) {
co_return get_node_health_reply{
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ constexpr error_code map_topic_error_code(cluster::errc code) {
case cluster::errc::waiting_for_shard_placement_update:
case cluster::errc::producer_ids_vcluster_limit_exceeded:
case cluster::errc::validation_of_recovery_topic_failed:
case cluster::errc::invalid_target_node_id:
break;
}
return error_code::unknown_server_error;
Expand Down
15 changes: 15 additions & 0 deletions tests/rptest/tests/node_folder_deletion_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,22 @@ def test_deleting_node_folder(self):
wait_until(lambda: producer.produce_status.acked > 200000,
timeout_sec=120,
backoff_sec=0.5)

admin = Admin(self.redpanda)

# validate that the node with deleted folder is recognized as offline
def removed_node_is_reported_offline():
cluster_health = admin.get_cluster_health_overview()
return id in cluster_health['nodes_down']

wait_until(
removed_node_is_reported_offline,
timeout_sec=20,
backoff_sec=0.5,
err_msg=
f"Node {id} is expected to be marked as offline as it was replaced by new node"
)

# decommission a node that has been cleared
admin.decommission_broker(id)
waiter = NodeDecommissionWaiter(self.redpanda,
Expand Down
Loading