Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition balancer: rack awareness constraint repair #6845

Merged
merged 10 commits into from
Oct 28, 2022
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ v_cc_library(
remote_topic_configuration_source.cc
partition_balancer_planner.cc
partition_balancer_backend.cc
partition_balancer_state.cc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a nice clean up--consolidating state.

Copy link
Contributor Author

@ztlpn ztlpn Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that was the idea. Although there is not much consolidation right now, we can use this class to store some balancing-specific indexes (e.g. node -> ntp map). Will be helpful when we eventually will need to get rid of those "iterate over all ntps" loops.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get rid of those "iterate over all ntps" loops.
😍

partition_balancer_rpc_handler.cc
node_status_backend.cc
node_status_rpc_handler.cc
Expand Down
17 changes: 14 additions & 3 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "cluster/metadata_dissemination_service.h"
#include "cluster/metrics_reporter.h"
#include "cluster/partition_balancer_backend.h"
#include "cluster/partition_balancer_state.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/partition_manager.h"
#include "cluster/raft0_utils.h"
Expand Down Expand Up @@ -70,7 +71,11 @@ controller::controller(
, _shard_table(st)
, _storage(storage)
, _storage_node(storage_node)
, _tp_updates_dispatcher(_partition_allocator, _tp_state, _partition_leaders)
, _tp_updates_dispatcher(
_partition_allocator,
_tp_state,
_partition_leaders,
_partition_balancer_state)
, _security_manager(_credentials, _authorizer)
, _data_policy_manager(data_policy_table)
, _raft_manager(raft_manager)
Expand All @@ -96,6 +101,12 @@ ss::future<> controller::wire_up() {
[]() { return config::shard_local_cfg().superusers.bind(); });
})
.then([this] { return _tp_state.start(); })
.then([this] {
return _partition_balancer_state.start_single(
std::ref(_tp_state),
std::ref(_members_table),
std::ref(_partition_allocator));
})
.then([this] { _probe.start(); });
}

Expand Down Expand Up @@ -375,9 +386,8 @@ controller::start(std::vector<model::broker> initial_raft0_brokers) {
return _partition_balancer.start_single(
_raft0,
std::ref(_stm),
std::ref(_tp_state),
std::ref(_partition_balancer_state),
std::ref(_hm_frontend),
std::ref(_members_table),
std::ref(_partition_allocator),
std::ref(_tp_frontend),
config::shard_local_cfg().partition_autobalancing_mode.bind(),
Expand Down Expand Up @@ -443,6 +453,7 @@ ss::future<> controller::stop() {
.then([this] { return _tp_state.stop(); })
.then([this] { return _members_manager.stop(); })
.then([this] { return _drain_manager.stop(); })
.then([this] { return _partition_balancer_state.stop(); })
.then([this] { return _partition_allocator.stop(); })
.then([this] { return _partition_leaders.stop(); })
.then([this] { return _members_table.stop(); })
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class controller {
ss::sharded<partition_allocator> _partition_allocator; // single instance
ss::sharded<topic_table> _tp_state; // instance per core
ss::sharded<members_table> _members_table; // instance per core
ss::sharded<partition_balancer_state>
_partition_balancer_state; // single instance
ss::sharded<partition_leaders_table>
_partition_leaders; // instance per core
ss::sharded<drain_manager> _drain_manager; // instance per core
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class feature_frontend;
class feature_manager;
class drain_manager;
class partition_balancer_backend;
class partition_balancer_state;
class node_status_backend;
class node_status_table;

Expand Down
14 changes: 6 additions & 8 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "cluster/partition_balancer_planner.h"
#include "cluster/partition_balancer_state.h"
#include "cluster/topics_frontend.h"
#include "random/generators.h"

Expand All @@ -33,9 +34,8 @@ static constexpr std::chrono::seconds add_move_cmd_timeout = 10s;
partition_balancer_backend::partition_balancer_backend(
consensus_ptr raft0,
ss::sharded<controller_stm>& controller_stm,
ss::sharded<topic_table>& topic_table,
ss::sharded<partition_balancer_state>& state,
ss::sharded<health_monitor_frontend>& health_monitor,
ss::sharded<members_table>& members_table,
ss::sharded<partition_allocator>& partition_allocator,
ss::sharded<topics_frontend>& topics_frontend,
config::binding<model::partition_autobalancing_mode>&& mode,
Expand All @@ -46,9 +46,8 @@ partition_balancer_backend::partition_balancer_backend(
config::binding<size_t>&& movement_batch_size_bytes)
: _raft0(std::move(raft0))
, _controller_stm(controller_stm.local())
, _topic_table(topic_table.local())
, _state(state.local())
, _health_monitor(health_monitor.local())
, _members_table(members_table.local())
, _partition_allocator(partition_allocator.local())
, _topics_frontend(topics_frontend.local())
, _mode(std::move(mode))
Expand Down Expand Up @@ -160,16 +159,15 @@ ss::future<> partition_balancer_backend::do_tick() {
.movement_disk_size_batch = _movement_batch_size_bytes(),
.node_availability_timeout_sec = _availability_timeout(),
},
_topic_table,
_members_table,
_state,
_partition_allocator)
.plan_reassignments(health_report.value(), follower_metrics);

_last_leader_term = _raft0->term();
_last_tick_time = ss::lowres_clock::now();
_last_violations = std::move(plan_data.violations);
if (
_topic_table.has_updates_in_progress()
_state.topics().has_updates_in_progress()
|| plan_data.status == planner_status::cancellations_planned
|| plan_data.status == planner_status::movement_planned) {
_last_status = partition_balancer_status::in_progress;
Expand All @@ -193,7 +191,7 @@ ss::future<> partition_balancer_backend::do_tick() {
_last_status,
_last_violations.unavailable_nodes.size(),
_last_violations.full_nodes.size(),
_topic_table.updates_in_progress().size(),
_state.topics().updates_in_progress().size(),
plan_data.reassignments.size(),
plan_data.cancellations.size(),
plan_data.failed_reassignments_count);
Expand Down
6 changes: 2 additions & 4 deletions src/v/cluster/partition_balancer_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ class partition_balancer_backend {
partition_balancer_backend(
consensus_ptr raft0,
ss::sharded<controller_stm>&,
ss::sharded<topic_table>&,
ss::sharded<partition_balancer_state>&,
ss::sharded<health_monitor_frontend>&,
ss::sharded<members_table>&,
ss::sharded<partition_allocator>&,
ss::sharded<topics_frontend>&,
config::binding<model::partition_autobalancing_mode>&& mode,
Expand Down Expand Up @@ -70,9 +69,8 @@ class partition_balancer_backend {
consensus_ptr _raft0;

controller_stm& _controller_stm;
topic_table& _topic_table;
partition_balancer_state& _state;
health_monitor_frontend& _health_monitor;
members_table& _members_table;
partition_allocator& _partition_allocator;
topics_frontend& _topics_frontend;

Expand Down
25 changes: 11 additions & 14 deletions src/v/cluster/partition_balancer_planner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "cluster/cluster_utils.h"
#include "cluster/members_table.h"
#include "cluster/partition_balancer_state.h"
#include "cluster/partition_balancer_types.h"
#include "cluster/scheduling/constraints.h"
#include "cluster/scheduling/types.h"
Expand Down Expand Up @@ -51,12 +52,10 @@ distinct_from(const absl::flat_hash_set<model::node_id>& nodes) {

partition_balancer_planner::partition_balancer_planner(
planner_config config,
topic_table& topic_table,
members_table& members_table,
partition_balancer_state& state,
partition_allocator& partition_allocator)
: _config(config)
, _topic_table(topic_table)
, _members_table(members_table)
, _state(state)
, _partition_allocator(partition_allocator) {
_config.soft_max_disk_usage_ratio = std::min(
_config.soft_max_disk_usage_ratio, _config.hard_max_disk_usage_ratio);
Expand All @@ -67,7 +66,7 @@ void partition_balancer_planner::init_per_node_state(
const std::vector<raft::follower_metrics>& follower_metrics,
reallocation_request_state& rrs,
plan_data& result) const {
for (const auto& broker : _members_table.all_brokers()) {
for (const auto& broker : _state.members().all_brokers()) {
if (
broker->get_membership_state() == model::membership_state::removed) {
continue;
Expand Down Expand Up @@ -305,7 +304,7 @@ void partition_balancer_planner::get_unavailable_nodes_reassignments(
return;
}

for (const auto& t : _topic_table.topics_map()) {
for (const auto& t : _state.topics().topics_map()) {
for (const auto& a : t.second.get_assignments()) {
// End adding movements if batch is collected
if (rrs.planned_moves_size >= _config.movement_disk_size_batch) {
Expand Down Expand Up @@ -392,7 +391,7 @@ void partition_balancer_planner::get_full_node_reassignments(
}

absl::flat_hash_map<model::node_id, std::vector<model::ntp>> ntp_on_nodes;
for (const auto& t : _topic_table.topics_map()) {
for (const auto& t : _state.topics().topics_map()) {
for (const auto& a : t.second.get_assignments()) {
for (const auto& r : a.replicas) {
ntp_on_nodes[r.node_id].emplace_back(
Expand Down Expand Up @@ -429,7 +428,7 @@ void partition_balancer_planner::get_full_node_reassignments(
continue;
}

const auto& topic_metadata = _topic_table.topics_map().at(
const auto& topic_metadata = _state.topics().topics_map().at(
model::topic_namespace_view(partition_to_move));
const auto& current_assignments
= topic_metadata.get_assignments().find(
Expand Down Expand Up @@ -531,7 +530,7 @@ void partition_balancer_planner::get_full_node_reassignments(
*/
void partition_balancer_planner::get_unavailable_node_movement_cancellations(
plan_data& result, const reallocation_request_state& rrs) {
for (const auto& update : _topic_table.updates_in_progress()) {
for (const auto& update : _state.topics().updates_in_progress()) {
if (
update.second.get_state()
!= topic_table::in_progress_state::update_requested) {
Expand All @@ -547,7 +546,7 @@ void partition_balancer_planner::get_unavailable_node_movement_cancellations(
}
}

auto current_assignments = _topic_table.get_partition_assignment(
auto current_assignments = _state.topics().get_partition_assignment(
update.first);
if (!current_assignments.has_value()) {
continue;
Expand Down Expand Up @@ -583,7 +582,7 @@ partition_balancer_planner::plan_reassignments(
return result;
}

if (_topic_table.has_updates_in_progress()) {
if (_state.topics().has_updates_in_progress()) {
get_unavailable_node_movement_cancellations(result, rrs);
if (!result.cancellations.empty()) {
result.status = status::cancellations_planned;
Expand All @@ -596,9 +595,7 @@ partition_balancer_planner::plan_reassignments(
return result;
}

if (
!_topic_table.has_updates_in_progress()
&& !result.violations.is_empty()) {
if (!result.violations.is_empty()) {
init_ntp_sizes_from_health_report(health_report, rrs);
get_unavailable_nodes_reassignments(result, rrs);
get_full_node_reassignments(result, rrs);
Expand Down
6 changes: 2 additions & 4 deletions src/v/cluster/partition_balancer_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ class partition_balancer_planner {
public:
partition_balancer_planner(
planner_config config,
topic_table& topic_table,
members_table& members_table,
partition_balancer_state& state,
partition_allocator& partition_allocator);

enum class status {
Expand Down Expand Up @@ -121,8 +120,7 @@ class partition_balancer_planner {
bool all_reports_received(const reallocation_request_state&);

planner_config _config;
topic_table& _topic_table;
members_table& _members_table;
partition_balancer_state& _state;
partition_allocator& _partition_allocator;
};

Expand Down
75 changes: 75 additions & 0 deletions src/v/cluster/partition_balancer_state.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2022 Redpanda Data, Inc.
*
* Licensed as a Redpanda Enterprise file under the Redpanda Community
* License (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "cluster/partition_balancer_state.h"

#include "cluster/logger.h"
#include "cluster/scheduling/partition_allocator.h"

#include <absl/container/flat_hash_set.h>

namespace cluster {

partition_balancer_state::partition_balancer_state(
ss::sharded<topic_table>& topic_table,
ss::sharded<members_table>& members_table,
ss::sharded<partition_allocator>& pa)
: _topic_table(topic_table.local())
, _members_table(members_table.local())
, _partition_allocator(pa.local()) {}

void partition_balancer_state::handle_ntp_update(
const model::ns& ns,
const model::topic& tp,
model::partition_id p_id,
const std::vector<model::broker_shard>& prev,
const std::vector<model::broker_shard>& next) {
if (_partition_allocator.is_rack_awareness_enabled()) {
absl::flat_hash_set<model::rack_id> racks;
bool is_rack_constraint_violated = false;
for (const auto& bs : next) {
auto rack = _partition_allocator.state().get_rack_id(bs.node_id);
if (rack) {
auto res = racks.insert(std::move(*rack));
if (!res.second) {
is_rack_constraint_violated = true;
break;
}
}
}

model::ntp ntp(ns, tp, p_id);
if (is_rack_constraint_violated) {
auto res = _ntps_with_broken_rack_constraint.insert(std::move(ntp));
if (res.second) {
vlog(
clusterlog.debug,
"rack constraint violated for ntp: {}, "
"replica set change: {} -> {}",
ntp,
prev,
next);
}
} else {
auto erased = _ntps_with_broken_rack_constraint.erase(ntp);
if (erased > 0) {
vlog(
clusterlog.debug,
"rack constraint restored for ntp: {}, "
"replica set change: {} -> {}",
ntp,
prev,
next);
}
}
}
}

} // namespace cluster
Loading