Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition balancer: rack awareness constraint repair #6845

Merged
merged 10 commits into from
Oct 28, 2022
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ v_cc_library(
remote_topic_configuration_source.cc
partition_balancer_planner.cc
partition_balancer_backend.cc
partition_balancer_state.cc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a nice clean up--consolidating state.

Copy link
Contributor Author

@ztlpn ztlpn Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that was the idea. Although there is not much consolidation right now, we can use this class to store some balancing-specific indexes (e.g. node -> ntp map). Will be helpful when we eventually will need to get rid of those "iterate over all ntps" loops.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get rid of those "iterate over all ntps" loops.
😍

partition_balancer_rpc_handler.cc
node_status_backend.cc
node_status_rpc_handler.cc
Expand Down
17 changes: 14 additions & 3 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "cluster/metadata_dissemination_service.h"
#include "cluster/metrics_reporter.h"
#include "cluster/partition_balancer_backend.h"
#include "cluster/partition_balancer_state.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/partition_manager.h"
#include "cluster/raft0_utils.h"
Expand Down Expand Up @@ -70,7 +71,11 @@ controller::controller(
, _shard_table(st)
, _storage(storage)
, _storage_node(storage_node)
, _tp_updates_dispatcher(_partition_allocator, _tp_state, _partition_leaders)
, _tp_updates_dispatcher(
_partition_allocator,
_tp_state,
_partition_leaders,
_partition_balancer_state)
, _security_manager(_credentials, _authorizer)
, _data_policy_manager(data_policy_table)
, _raft_manager(raft_manager)
Expand All @@ -96,6 +101,12 @@ ss::future<> controller::wire_up() {
[]() { return config::shard_local_cfg().superusers.bind(); });
})
.then([this] { return _tp_state.start(); })
.then([this] {
return _partition_balancer_state.start_single(
std::ref(_tp_state),
std::ref(_members_table),
std::ref(_partition_allocator));
})
.then([this] { _probe.start(); });
}

Expand Down Expand Up @@ -375,9 +386,8 @@ controller::start(std::vector<model::broker> initial_raft0_brokers) {
return _partition_balancer.start_single(
_raft0,
std::ref(_stm),
std::ref(_tp_state),
std::ref(_partition_balancer_state),
std::ref(_hm_frontend),
std::ref(_members_table),
std::ref(_partition_allocator),
std::ref(_tp_frontend),
config::shard_local_cfg().partition_autobalancing_mode.bind(),
Expand Down Expand Up @@ -443,6 +453,7 @@ ss::future<> controller::stop() {
.then([this] { return _tp_state.stop(); })
.then([this] { return _members_manager.stop(); })
.then([this] { return _drain_manager.stop(); })
.then([this] { return _partition_balancer_state.stop(); })
.then([this] { return _partition_allocator.stop(); })
.then([this] { return _partition_leaders.stop(); })
.then([this] { return _members_table.stop(); })
Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class controller {
ss::sharded<partition_allocator> _partition_allocator; // single instance
ss::sharded<topic_table> _tp_state; // instance per core
ss::sharded<members_table> _members_table; // instance per core
ss::sharded<partition_balancer_state>
_partition_balancer_state; // single instance
ss::sharded<partition_leaders_table>
_partition_leaders; // instance per core
ss::sharded<drain_manager> _drain_manager; // instance per core
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class feature_frontend;
class feature_manager;
class drain_manager;
class partition_balancer_backend;
class partition_balancer_state;
class node_status_backend;
class node_status_table;

Expand Down
45 changes: 20 additions & 25 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "cluster/logger.h"
#include "cluster/members_table.h"
#include "cluster/partition_balancer_planner.h"
#include "cluster/partition_balancer_state.h"
#include "cluster/topics_frontend.h"
#include "random/generators.h"

Expand All @@ -33,9 +34,8 @@ static constexpr std::chrono::seconds add_move_cmd_timeout = 10s;
partition_balancer_backend::partition_balancer_backend(
consensus_ptr raft0,
ss::sharded<controller_stm>& controller_stm,
ss::sharded<topic_table>& topic_table,
ss::sharded<partition_balancer_state>& state,
ss::sharded<health_monitor_frontend>& health_monitor,
ss::sharded<members_table>& members_table,
ss::sharded<partition_allocator>& partition_allocator,
ss::sharded<topics_frontend>& topics_frontend,
config::binding<model::partition_autobalancing_mode>&& mode,
Expand All @@ -46,9 +46,8 @@ partition_balancer_backend::partition_balancer_backend(
config::binding<size_t>&& movement_batch_size_bytes)
: _raft0(std::move(raft0))
, _controller_stm(controller_stm.local())
, _topic_table(topic_table.local())
, _state(state.local())
, _health_monitor(health_monitor.local())
, _members_table(members_table.local())
, _partition_allocator(partition_allocator.local())
, _topics_frontend(topics_frontend.local())
, _mode(std::move(mode))
Expand Down Expand Up @@ -160,16 +159,15 @@ ss::future<> partition_balancer_backend::do_tick() {
.movement_disk_size_batch = _movement_batch_size_bytes(),
.node_availability_timeout_sec = _availability_timeout(),
},
_topic_table,
_members_table,
_state,
_partition_allocator)
.plan_reassignments(health_report.value(), follower_metrics);

_last_leader_term = _raft0->term();
_last_tick_time = ss::lowres_clock::now();
_last_violations = std::move(plan_data.violations);
if (
_topic_table.has_updates_in_progress()
_state.topics().has_updates_in_progress()
|| plan_data.status == planner_status::cancellations_planned
|| plan_data.status == planner_status::movement_planned) {
_last_status = partition_balancer_status::in_progress;
Expand All @@ -193,51 +191,48 @@ ss::future<> partition_balancer_backend::do_tick() {
_last_status,
_last_violations.unavailable_nodes.size(),
_last_violations.full_nodes.size(),
_topic_table.updates_in_progress().size(),
_state.topics().updates_in_progress().size(),
plan_data.reassignments.size(),
plan_data.cancellations.size(),
plan_data.failed_reassignments_count);
}

co_await ss::max_concurrent_for_each(
plan_data.cancellations, 32, [this, current_term](model::ntp& ntp) {
vlog(clusterlog.info, "cancel movement for ntp {}", ntp);
return _topics_frontend
.cancel_moving_partition_replicas(
ntp,
model::timeout_clock::now() + add_move_cmd_timeout,
current_term)
.then([ntp = std::move(ntp)](auto errc) {
vlog(
clusterlog.info,
"{} movement cancellation submitted, errc: {}",
ntp,
errc);
if (errc) {
vlog(
clusterlog.warn,
"submitting {} movement cancellation failed, error: {}",
ntp,
errc.message());
}
});
});

co_await ss::max_concurrent_for_each(
plan_data.reassignments,
32,
[this, current_term](ntp_reassignments& reassignment) {
vlog(
clusterlog.info,
"moving {} to {}",
reassignment.ntp,
reassignment.allocation_units.get_assignments().front().replicas);

return _topics_frontend
.move_partition_replicas(
reassignment.ntp,
reassignment.allocation_units.get_assignments().front().replicas,
model::timeout_clock::now() + add_move_cmd_timeout,
current_term)
.then([reassignment = std::move(reassignment)](auto errc) {
vlog(
clusterlog.info,
"{} reassignment submitted, errc: {}",
reassignment.ntp,
errc);
if (errc) {
vlog(
clusterlog.warn,
"submitting {} reassignment failed, error: {}",
reassignment.ntp,
errc.message());
}
});
});
}
Expand Down
6 changes: 2 additions & 4 deletions src/v/cluster/partition_balancer_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ class partition_balancer_backend {
partition_balancer_backend(
consensus_ptr raft0,
ss::sharded<controller_stm>&,
ss::sharded<topic_table>&,
ss::sharded<partition_balancer_state>&,
ss::sharded<health_monitor_frontend>&,
ss::sharded<members_table>&,
ss::sharded<partition_allocator>&,
ss::sharded<topics_frontend>&,
config::binding<model::partition_autobalancing_mode>&& mode,
Expand Down Expand Up @@ -70,9 +69,8 @@ class partition_balancer_backend {
consensus_ptr _raft0;

controller_stm& _controller_stm;
topic_table& _topic_table;
partition_balancer_state& _state;
health_monitor_frontend& _health_monitor;
members_table& _members_table;
partition_allocator& _partition_allocator;
topics_frontend& _topics_frontend;

Expand Down
Loading