From da9cafadf1f5802c51b25b669359cd81b5ab0ab5 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Mon, 17 Oct 2022 14:36:25 +0300 Subject: [PATCH 01/10] c/partition_allocator: add is_rack_awareness_enabled method --- src/v/cluster/scheduling/partition_allocator.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/v/cluster/scheduling/partition_allocator.h b/src/v/cluster/scheduling/partition_allocator.h index af55c0c126ba..89e55636a978 100644 --- a/src/v/cluster/scheduling/partition_allocator.h +++ b/src/v/cluster/scheduling/partition_allocator.h @@ -86,6 +86,8 @@ class partition_allocator { void remove_allocations( const std::vector&, partition_allocation_domain); + bool is_rack_awareness_enabled() const { return _enable_rack_awareness(); } + allocation_state& state() { return *_state; } private: From a6316b1c40f19be6b6eedef6fef234aab4f79163 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Tue, 18 Oct 2022 13:59:46 +0300 Subject: [PATCH 02/10] c/partition_balancer: add partition_balancer_state This is a class that stores state that is needed for functioning of the partition balancer. This commit also wires it up to topic_updates_dispatcher and adds code maintaining a set of ntps that have rack awareness constraint violated. --- src/v/cluster/CMakeLists.txt | 1 + src/v/cluster/controller.cc | 17 ++++- src/v/cluster/controller.h | 2 + src/v/cluster/fwd.h | 1 + src/v/cluster/partition_balancer_backend.cc | 14 ++-- src/v/cluster/partition_balancer_backend.h | 6 +- src/v/cluster/partition_balancer_planner.cc | 25 +++---- src/v/cluster/partition_balancer_planner.h | 6 +- src/v/cluster/partition_balancer_state.cc | 75 +++++++++++++++++++ src/v/cluster/partition_balancer_state.h | 57 ++++++++++++++ .../partition_balancer_planner_fixture.h | 11 ++- src/v/cluster/tests/topic_table_fixture.h | 6 ++ .../tests/topic_updates_dispatcher_test.cc | 2 +- src/v/cluster/topic_updates_dispatcher.cc | 71 ++++++++++++++++-- src/v/cluster/topic_updates_dispatcher.h | 11 ++- 15 files changed, 257 insertions(+), 48 deletions(-) create mode 100644 src/v/cluster/partition_balancer_state.cc create mode 100644 src/v/cluster/partition_balancer_state.h diff --git a/src/v/cluster/CMakeLists.txt b/src/v/cluster/CMakeLists.txt index 972d8593bf05..e3dfb72b7950 100644 --- a/src/v/cluster/CMakeLists.txt +++ b/src/v/cluster/CMakeLists.txt @@ -118,6 +118,7 @@ v_cc_library( remote_topic_configuration_source.cc partition_balancer_planner.cc partition_balancer_backend.cc + partition_balancer_state.cc partition_balancer_rpc_handler.cc node_status_backend.cc node_status_rpc_handler.cc diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 1f3dbaf436b1..38fb0aec175f 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -29,6 +29,7 @@ #include "cluster/metadata_dissemination_service.h" #include "cluster/metrics_reporter.h" #include "cluster/partition_balancer_backend.h" +#include "cluster/partition_balancer_state.h" #include "cluster/partition_leaders_table.h" #include "cluster/partition_manager.h" #include "cluster/raft0_utils.h" @@ -70,7 +71,11 @@ controller::controller( , _shard_table(st) , _storage(storage) , _storage_node(storage_node) - , _tp_updates_dispatcher(_partition_allocator, _tp_state, _partition_leaders) + , _tp_updates_dispatcher( + _partition_allocator, + _tp_state, + _partition_leaders, + _partition_balancer_state) , _security_manager(_credentials, _authorizer) , _data_policy_manager(data_policy_table) , _raft_manager(raft_manager) @@ -96,6 +101,12 @@ ss::future<> controller::wire_up() { []() { return config::shard_local_cfg().superusers.bind(); }); }) .then([this] { return _tp_state.start(); }) + .then([this] { + return _partition_balancer_state.start_single( + std::ref(_tp_state), + std::ref(_members_table), + std::ref(_partition_allocator)); + }) .then([this] { _probe.start(); }); } @@ -375,9 +386,8 @@ controller::start(std::vector initial_raft0_brokers) { return _partition_balancer.start_single( _raft0, std::ref(_stm), - std::ref(_tp_state), + std::ref(_partition_balancer_state), std::ref(_hm_frontend), - std::ref(_members_table), std::ref(_partition_allocator), std::ref(_tp_frontend), config::shard_local_cfg().partition_autobalancing_mode.bind(), @@ -443,6 +453,7 @@ ss::future<> controller::stop() { .then([this] { return _tp_state.stop(); }) .then([this] { return _members_manager.stop(); }) .then([this] { return _drain_manager.stop(); }) + .then([this] { return _partition_balancer_state.stop(); }) .then([this] { return _partition_allocator.stop(); }) .then([this] { return _partition_leaders.stop(); }) .then([this] { return _members_table.stop(); }) diff --git a/src/v/cluster/controller.h b/src/v/cluster/controller.h index a73ca24c627b..8e397ab25235 100644 --- a/src/v/cluster/controller.h +++ b/src/v/cluster/controller.h @@ -133,6 +133,8 @@ class controller { ss::sharded _partition_allocator; // single instance ss::sharded _tp_state; // instance per core ss::sharded _members_table; // instance per core + ss::sharded + _partition_balancer_state; // single instance ss::sharded _partition_leaders; // instance per core ss::sharded _drain_manager; // instance per core diff --git a/src/v/cluster/fwd.h b/src/v/cluster/fwd.h index f81c67c934eb..79d127fa1db6 100644 --- a/src/v/cluster/fwd.h +++ b/src/v/cluster/fwd.h @@ -48,6 +48,7 @@ class feature_frontend; class feature_manager; class drain_manager; class partition_balancer_backend; +class partition_balancer_state; class node_status_backend; class node_status_table; diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index 9a68b75aaa7e..2e2754d1cfb5 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -15,6 +15,7 @@ #include "cluster/logger.h" #include "cluster/members_table.h" #include "cluster/partition_balancer_planner.h" +#include "cluster/partition_balancer_state.h" #include "cluster/topics_frontend.h" #include "random/generators.h" @@ -33,9 +34,8 @@ static constexpr std::chrono::seconds add_move_cmd_timeout = 10s; partition_balancer_backend::partition_balancer_backend( consensus_ptr raft0, ss::sharded& controller_stm, - ss::sharded& topic_table, + ss::sharded& state, ss::sharded& health_monitor, - ss::sharded& members_table, ss::sharded& partition_allocator, ss::sharded& topics_frontend, config::binding&& mode, @@ -46,9 +46,8 @@ partition_balancer_backend::partition_balancer_backend( config::binding&& movement_batch_size_bytes) : _raft0(std::move(raft0)) , _controller_stm(controller_stm.local()) - , _topic_table(topic_table.local()) + , _state(state.local()) , _health_monitor(health_monitor.local()) - , _members_table(members_table.local()) , _partition_allocator(partition_allocator.local()) , _topics_frontend(topics_frontend.local()) , _mode(std::move(mode)) @@ -160,8 +159,7 @@ ss::future<> partition_balancer_backend::do_tick() { .movement_disk_size_batch = _movement_batch_size_bytes(), .node_availability_timeout_sec = _availability_timeout(), }, - _topic_table, - _members_table, + _state, _partition_allocator) .plan_reassignments(health_report.value(), follower_metrics); @@ -169,7 +167,7 @@ ss::future<> partition_balancer_backend::do_tick() { _last_tick_time = ss::lowres_clock::now(); _last_violations = std::move(plan_data.violations); if ( - _topic_table.has_updates_in_progress() + _state.topics().has_updates_in_progress() || plan_data.status == planner_status::cancellations_planned || plan_data.status == planner_status::movement_planned) { _last_status = partition_balancer_status::in_progress; @@ -193,7 +191,7 @@ ss::future<> partition_balancer_backend::do_tick() { _last_status, _last_violations.unavailable_nodes.size(), _last_violations.full_nodes.size(), - _topic_table.updates_in_progress().size(), + _state.topics().updates_in_progress().size(), plan_data.reassignments.size(), plan_data.cancellations.size(), plan_data.failed_reassignments_count); diff --git a/src/v/cluster/partition_balancer_backend.h b/src/v/cluster/partition_balancer_backend.h index 416cfa03b15a..cdefdf223757 100644 --- a/src/v/cluster/partition_balancer_backend.h +++ b/src/v/cluster/partition_balancer_backend.h @@ -29,9 +29,8 @@ class partition_balancer_backend { partition_balancer_backend( consensus_ptr raft0, ss::sharded&, - ss::sharded&, + ss::sharded&, ss::sharded&, - ss::sharded&, ss::sharded&, ss::sharded&, config::binding&& mode, @@ -70,9 +69,8 @@ class partition_balancer_backend { consensus_ptr _raft0; controller_stm& _controller_stm; - topic_table& _topic_table; + partition_balancer_state& _state; health_monitor_frontend& _health_monitor; - members_table& _members_table; partition_allocator& _partition_allocator; topics_frontend& _topics_frontend; diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index b0164edc3e1b..502a9cd28891 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -12,6 +12,7 @@ #include "cluster/cluster_utils.h" #include "cluster/members_table.h" +#include "cluster/partition_balancer_state.h" #include "cluster/partition_balancer_types.h" #include "cluster/scheduling/constraints.h" #include "cluster/scheduling/types.h" @@ -51,12 +52,10 @@ distinct_from(const absl::flat_hash_set& nodes) { partition_balancer_planner::partition_balancer_planner( planner_config config, - topic_table& topic_table, - members_table& members_table, + partition_balancer_state& state, partition_allocator& partition_allocator) : _config(config) - , _topic_table(topic_table) - , _members_table(members_table) + , _state(state) , _partition_allocator(partition_allocator) { _config.soft_max_disk_usage_ratio = std::min( _config.soft_max_disk_usage_ratio, _config.hard_max_disk_usage_ratio); @@ -67,7 +66,7 @@ void partition_balancer_planner::init_per_node_state( const std::vector& follower_metrics, reallocation_request_state& rrs, plan_data& result) const { - for (const auto& broker : _members_table.all_brokers()) { + for (const auto& broker : _state.members().all_brokers()) { if ( broker->get_membership_state() == model::membership_state::removed) { continue; @@ -305,7 +304,7 @@ void partition_balancer_planner::get_unavailable_nodes_reassignments( return; } - for (const auto& t : _topic_table.topics_map()) { + for (const auto& t : _state.topics().topics_map()) { for (const auto& a : t.second.get_assignments()) { // End adding movements if batch is collected if (rrs.planned_moves_size >= _config.movement_disk_size_batch) { @@ -392,7 +391,7 @@ void partition_balancer_planner::get_full_node_reassignments( } absl::flat_hash_map> ntp_on_nodes; - for (const auto& t : _topic_table.topics_map()) { + for (const auto& t : _state.topics().topics_map()) { for (const auto& a : t.second.get_assignments()) { for (const auto& r : a.replicas) { ntp_on_nodes[r.node_id].emplace_back( @@ -429,7 +428,7 @@ void partition_balancer_planner::get_full_node_reassignments( continue; } - const auto& topic_metadata = _topic_table.topics_map().at( + const auto& topic_metadata = _state.topics().topics_map().at( model::topic_namespace_view(partition_to_move)); const auto& current_assignments = topic_metadata.get_assignments().find( @@ -531,7 +530,7 @@ void partition_balancer_planner::get_full_node_reassignments( */ void partition_balancer_planner::get_unavailable_node_movement_cancellations( plan_data& result, const reallocation_request_state& rrs) { - for (const auto& update : _topic_table.updates_in_progress()) { + for (const auto& update : _state.topics().updates_in_progress()) { if ( update.second.get_state() != topic_table::in_progress_state::update_requested) { @@ -547,7 +546,7 @@ void partition_balancer_planner::get_unavailable_node_movement_cancellations( } } - auto current_assignments = _topic_table.get_partition_assignment( + auto current_assignments = _state.topics().get_partition_assignment( update.first); if (!current_assignments.has_value()) { continue; @@ -583,7 +582,7 @@ partition_balancer_planner::plan_reassignments( return result; } - if (_topic_table.has_updates_in_progress()) { + if (_state.topics().has_updates_in_progress()) { get_unavailable_node_movement_cancellations(result, rrs); if (!result.cancellations.empty()) { result.status = status::cancellations_planned; @@ -596,9 +595,7 @@ partition_balancer_planner::plan_reassignments( return result; } - if ( - !_topic_table.has_updates_in_progress() - && !result.violations.is_empty()) { + if (!result.violations.is_empty()) { init_ntp_sizes_from_health_report(health_report, rrs); get_unavailable_nodes_reassignments(result, rrs); get_full_node_reassignments(result, rrs); diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index c9d6f084a6df..4aed6041597b 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -40,8 +40,7 @@ class partition_balancer_planner { public: partition_balancer_planner( planner_config config, - topic_table& topic_table, - members_table& members_table, + partition_balancer_state& state, partition_allocator& partition_allocator); enum class status { @@ -121,8 +120,7 @@ class partition_balancer_planner { bool all_reports_received(const reallocation_request_state&); planner_config _config; - topic_table& _topic_table; - members_table& _members_table; + partition_balancer_state& _state; partition_allocator& _partition_allocator; }; diff --git a/src/v/cluster/partition_balancer_state.cc b/src/v/cluster/partition_balancer_state.cc new file mode 100644 index 000000000000..ba5e50b66fec --- /dev/null +++ b/src/v/cluster/partition_balancer_state.cc @@ -0,0 +1,75 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#include "cluster/partition_balancer_state.h" + +#include "cluster/logger.h" +#include "cluster/scheduling/partition_allocator.h" + +#include + +namespace cluster { + +partition_balancer_state::partition_balancer_state( + ss::sharded& topic_table, + ss::sharded& members_table, + ss::sharded& pa) + : _topic_table(topic_table.local()) + , _members_table(members_table.local()) + , _partition_allocator(pa.local()) {} + +void partition_balancer_state::handle_ntp_update( + const model::ns& ns, + const model::topic& tp, + model::partition_id p_id, + const std::vector& prev, + const std::vector& next) { + if (_partition_allocator.is_rack_awareness_enabled()) { + absl::flat_hash_set racks; + bool is_rack_constraint_violated = false; + for (const auto& bs : next) { + auto rack = _partition_allocator.state().get_rack_id(bs.node_id); + if (rack) { + auto res = racks.insert(std::move(*rack)); + if (!res.second) { + is_rack_constraint_violated = true; + break; + } + } + } + + model::ntp ntp(ns, tp, p_id); + if (is_rack_constraint_violated) { + auto res = _ntps_with_broken_rack_constraint.insert(std::move(ntp)); + if (res.second) { + vlog( + clusterlog.debug, + "rack constraint violated for ntp: {}, " + "replica set change: {} -> {}", + ntp, + prev, + next); + } + } else { + auto erased = _ntps_with_broken_rack_constraint.erase(ntp); + if (erased > 0) { + vlog( + clusterlog.debug, + "rack constraint restored for ntp: {}, " + "replica set change: {} -> {}", + ntp, + prev, + next); + } + } + } +} + +} // namespace cluster diff --git a/src/v/cluster/partition_balancer_state.h b/src/v/cluster/partition_balancer_state.h new file mode 100644 index 000000000000..ea0062bc78e7 --- /dev/null +++ b/src/v/cluster/partition_balancer_state.h @@ -0,0 +1,57 @@ +/* + * Copyright 2022 Redpanda Data, Inc. + * + * Licensed as a Redpanda Enterprise file under the Redpanda Community + * License (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + */ + +#pragma once + +#include "cluster/fwd.h" +#include "model/fundamental.h" +#include "model/metadata.h" + +#include + +namespace cluster { + +/// Class that stores state that is needed for functioning of the partition +/// balancer. It is updated from the controller log (via +/// topic_updates_dispatcher) +class partition_balancer_state { +public: + partition_balancer_state( + ss::sharded&, + ss::sharded&, + ss::sharded&); + + const topic_table& topics() const { return _topic_table; } + + const members_table& members() const { return _members_table; } + + const absl::btree_set& + ntps_with_broken_rack_constraint() const { + return _ntps_with_broken_rack_constraint; + } + + /// Called when the replica set of an ntp changes. Note that this doesn't + /// account for in-progress moves - the function is called only once when + /// the move is started. + void handle_ntp_update( + const model::ns&, + const model::topic&, + model::partition_id, + const std::vector& prev, + const std::vector& next); + +private: + topic_table& _topic_table; + members_table& _members_table; + partition_allocator& _partition_allocator; + absl::btree_set _ntps_with_broken_rack_constraint; +}; + +} // namespace cluster diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index eccf24aa5224..e78a1f2ec525 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -13,6 +13,7 @@ #include "cluster/members_table.h" #include "cluster/partition_balancer_planner.h" +#include "cluster/partition_balancer_state.h" #include "cluster/tests/utils.h" #include "cluster/topic_updates_dispatcher.h" #include "model/metadata.h" @@ -50,7 +51,7 @@ static std::unique_ptr create_allocation_node( struct controller_workers { public: controller_workers() - : dispatcher(allocator, table, leaders) { + : dispatcher(allocator, table, leaders, state) { table.start().get(); members.start_single().get(); allocator @@ -62,9 +63,13 @@ struct controller_workers { config::mock_binding(uint32_t{partitions_reserve_shard0}), config::mock_binding(true)) .get(); + state + .start_single(std::ref(table), std::ref(members), std::ref(allocator)) + .get(); } ~controller_workers() { + state.stop().get(); table.stop().get(); allocator.stop().get(); members.stop().get(); @@ -74,6 +79,7 @@ struct controller_workers { ss::sharded allocator; ss::sharded table; ss::sharded leaders; + ss::sharded state; cluster::topic_updates_dispatcher dispatcher; }; @@ -85,8 +91,7 @@ struct partition_balancer_planner_fixture { .hard_max_disk_usage_ratio = 0.95, .movement_disk_size_batch = reallocation_batch_size, .node_availability_timeout_sec = std::chrono::minutes(1)}, - workers.table.local(), - workers.members.local(), + workers.state.local(), workers.allocator.local()) {} cluster::topic_configuration_assignment make_tp_configuration( diff --git a/src/v/cluster/tests/topic_table_fixture.h b/src/v/cluster/tests/topic_table_fixture.h index 3952934a39f8..bb543fe23762 100644 --- a/src/v/cluster/tests/topic_table_fixture.h +++ b/src/v/cluster/tests/topic_table_fixture.h @@ -12,6 +12,7 @@ #pragma once #include "cluster/members_table.h" +#include "cluster/partition_balancer_state.h" #include "cluster/scheduling/allocation_node.h" #include "cluster/scheduling/partition_allocator.h" #include "cluster/tests/utils.h" @@ -58,9 +59,13 @@ struct topic_table_fixture { create_allocation_node(model::node_id(2), 12)); allocator.local().register_node( create_allocation_node(model::node_id(3), 4)); + pb_state + .start_single(std::ref(table), std::ref(members), std::ref(allocator)) + .get(); } ~topic_table_fixture() { + pb_state.stop().get(); table.stop().get0(); allocator.stop().get0(); members.stop().get0(); @@ -149,5 +154,6 @@ struct topic_table_fixture { ss::sharded allocator; ss::sharded table; ss::sharded leaders; + ss::sharded pb_state; ss::abort_source as; }; diff --git a/src/v/cluster/tests/topic_updates_dispatcher_test.cc b/src/v/cluster/tests/topic_updates_dispatcher_test.cc index ce5ac0fb2a29..ebe72f06b9f5 100644 --- a/src/v/cluster/tests/topic_updates_dispatcher_test.cc +++ b/src/v/cluster/tests/topic_updates_dispatcher_test.cc @@ -21,7 +21,7 @@ using namespace std::chrono_literals; struct topic_table_updates_dispatcher_fixture : topic_table_fixture { topic_table_updates_dispatcher_fixture() - : dispatcher(allocator, table, leaders) {} + : dispatcher(allocator, table, leaders, pb_state) {} void create_topics() { auto cmd_1 = make_create_topic_cmd("test_tp_1", 1, 3); diff --git a/src/v/cluster/topic_updates_dispatcher.cc b/src/v/cluster/topic_updates_dispatcher.cc index d5f9c4e61af5..fc5ca09d999b 100644 --- a/src/v/cluster/topic_updates_dispatcher.cc +++ b/src/v/cluster/topic_updates_dispatcher.cc @@ -11,6 +11,7 @@ #include "cluster/cluster_utils.h" #include "cluster/commands.h" +#include "cluster/partition_balancer_state.h" #include "cluster/partition_leaders_table.h" #include "cluster/topic_table.h" #include "model/fundamental.h" @@ -28,10 +29,12 @@ namespace cluster { topic_updates_dispatcher::topic_updates_dispatcher( ss::sharded& pal, ss::sharded& table, - ss::sharded& leaders) + ss::sharded& leaders, + ss::sharded& pb_state) : _partition_allocator(pal) , _topic_table(table) - , _partition_leaders_table(leaders) {} + , _partition_leaders_table(leaders) + , _partition_balancer_state(pb_state) {} ss::future topic_updates_dispatcher::apply_update(model::record_batch b) { @@ -41,7 +44,6 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { return ss::visit( std::move(cmd), [this, base_offset](delete_topic_cmd del_cmd) { - auto tp_ns = del_cmd.key; auto topic_assignments = _topic_table.local().get_topic_assignments(del_cmd.value); in_progress_map in_progress; @@ -52,6 +54,7 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { } return dispatch_updates_to_cores(del_cmd, base_offset) .then([this, + tp_ns = std::move(del_cmd.key), topic_assignments = std::move(topic_assignments), in_progress = std::move(in_progress), allocation_domain = get_allocation_domain( @@ -62,6 +65,16 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { "Topic had to exist before successful delete"); deallocate_topic( *topic_assignments, in_progress, allocation_domain); + + for (const auto& p_as : *topic_assignments) { + _partition_balancer_state.local() + .handle_ntp_update( + tp_ns.ns, + tp_ns.tp, + p_as.id, + p_as.replicas, + {}); + } } return ec; @@ -71,9 +84,21 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { return dispatch_updates_to_cores(create_cmd, base_offset) .then([this, create_cmd](std::error_code ec) { if (ec == errc::success) { + const auto& tp_ns = create_cmd.key; update_allocations( create_cmd.value.assignments, - get_allocation_domain(create_cmd.key)); + get_allocation_domain(tp_ns)); + + for (const auto& p_as : + create_cmd.value.assignments) { + _partition_balancer_state.local() + .handle_ntp_update( + tp_ns.ns, + tp_ns.tp, + p_as.id, + {}, + p_as.replicas); + } } return ec; }) @@ -103,15 +128,23 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { .then([this, p_as = std::move(p_as), cmd = std::move(cmd)]( std::error_code ec) { if (!ec) { + const auto& ntp = cmd.key; vassert( p_as.has_value(), "Partition {} have to exist before successful " "partition reallocation", - cmd.key); + ntp); auto to_add = subtract_replica_sets( cmd.value, p_as->replicas); _partition_allocator.local().add_allocations( - to_add, get_allocation_domain(cmd.key)); + to_add, get_allocation_domain(ntp)); + + _partition_balancer_state.local().handle_ntp_update( + ntp.ns, + ntp.tp.topic, + ntp.tp.partition, + p_as->replicas, + cmd.value); } return ec; }); @@ -143,6 +176,12 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { current_assignment->replicas, *new_target_replicas); _partition_allocator.local().remove_allocations( to_delete, get_allocation_domain(ntp)); + _partition_balancer_state.local().handle_ntp_update( + ntp.ns, + ntp.tp.topic, + ntp.tp.partition, + current_assignment->replicas, + *new_target_replicas); return ec; }); }, @@ -179,9 +218,20 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { return dispatch_updates_to_cores(cmd, base_offset) .then([this, cmd](std::error_code ec) { if (ec == errc::success) { + const auto& tp_ns = cmd.key; update_allocations( cmd.value.assignments, - get_allocation_domain(cmd.key)); + get_allocation_domain(tp_ns)); + + for (const auto& p_as : cmd.value.assignments) { + _partition_balancer_state.local() + .handle_ntp_update( + tp_ns.ns, + tp_ns.tp, + p_as.id, + {}, + p_as.replicas); + } } return ec; }); @@ -234,6 +284,13 @@ topic_updates_dispatcher::apply_update(model::record_batch b) { replicas, assigment_it->replicas); _partition_allocator.local().add_allocations( to_add, get_allocation_domain(ntp)); + _partition_balancer_state.local() + .handle_ntp_update( + ntp.ns, + ntp.tp.topic, + ntp.tp.partition, + assigment_it->replicas, + replicas); } } return ec; diff --git a/src/v/cluster/topic_updates_dispatcher.h b/src/v/cluster/topic_updates_dispatcher.h index b73e273993d3..d582ba042731 100644 --- a/src/v/cluster/topic_updates_dispatcher.h +++ b/src/v/cluster/topic_updates_dispatcher.h @@ -11,6 +11,7 @@ #pragma once #include "cluster/commands.h" +#include "cluster/fwd.h" #include "cluster/scheduling/partition_allocator.h" #include "cluster/topic_table.h" #include "cluster/types.h" @@ -23,9 +24,9 @@ namespace cluster { // The topic updates dispatcher is responsible for receiving update_apply // upcalls from controller state machine and propagating updates to topic state -// core local copies. The dispatcher handles partition_allocator updates. The -// partition allocator exists only on core 0 hence the updates have to be -// executed at the same core. +// core local copies. The dispatcher also handles partition_allocator and +// partition_balancer_state updates. Those services exist only on core 0 hence +// the updates have to be executed at the same core. // // // +----------------+ +------------+ @@ -53,7 +54,8 @@ class topic_updates_dispatcher { topic_updates_dispatcher( ss::sharded&, ss::sharded&, - ss::sharded&); + ss::sharded&, + ss::sharded&); ss::future apply_update(model::record_batch); @@ -96,6 +98,7 @@ class topic_updates_dispatcher { ss::sharded& _partition_allocator; ss::sharded& _topic_table; ss::sharded& _partition_leaders_table; + ss::sharded& _partition_balancer_state; }; } // namespace cluster From 670b3188f833b51ec388989b03d6b0815212b74e Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Wed, 19 Oct 2022 18:06:45 +0300 Subject: [PATCH 03/10] c/partition_balancer_planner: get rid of the metadata param Replication factor now is anyway calculated from the number of replicas of partition 0 so we don't need the metadata object if we have the set of replicas. --- src/v/cluster/partition_balancer_planner.cc | 5 +---- src/v/cluster/partition_balancer_planner.h | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index 502a9cd28891..b2dd4dac022d 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -201,7 +201,6 @@ std::optional partition_balancer_planner::get_partition_size( partition_constraints partition_balancer_planner::get_partition_constraints( const partition_assignment& assignments, - const topic_metadata& topic_metadata, size_t partition_size, double max_disk_usage_ratio, reallocation_request_state& rrs) const { @@ -232,7 +231,7 @@ partition_constraints partition_balancer_planner::get_partition_constraints( return partition_constraints( assignments.id, - topic_metadata.get_replication_factor(), + assignments.replicas.size(), std::move(allocation_constraints)); } @@ -337,7 +336,6 @@ void partition_balancer_planner::get_unavailable_nodes_reassignments( auto constraints = get_partition_constraints( a, - t.second.metadata, partition_size.value(), _config.hard_max_disk_usage_ratio, rrs); @@ -443,7 +441,6 @@ void partition_balancer_planner::get_full_node_reassignments( auto constraints = get_partition_constraints( *current_assignments, - topic_metadata.metadata, ntp_size_it->first, _config.soft_max_disk_usage_ratio, rrs); diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index 4aed6041597b..a0c6d67eb35b 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -80,7 +80,6 @@ class partition_balancer_planner { partition_constraints get_partition_constraints( const partition_assignment& assignments, - const topic_metadata& topic_metadata, size_t partition_size, double max_disk_usage_ratio, reallocation_request_state&) const; From 5f00b5197865a5f2c787ea4f1f618dcad212dd73 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 14 Oct 2022 20:43:59 +0300 Subject: [PATCH 04/10] c/partition_balancer: schedule moves repairing rack awareness constraint --- src/v/cluster/partition_balancer_planner.cc | 103 +++++++++++++++++++- src/v/cluster/partition_balancer_planner.h | 3 + 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index b2dd4dac022d..4df1f4c3f5d5 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -358,6 +358,104 @@ void partition_balancer_planner::get_unavailable_nodes_reassignments( } } +/// Try to fix ntps that have several replicas in one rack (these ntps can +/// appear because rack awareness constraint is not a hard constraint, e.g. when +/// a rack dies and we move all replicas that resided on dead nodes to live +/// ones). +/// +/// We go over all such ntps (a list maintained by partition_balancer_state) and +/// if the number of currently live racks is more than the number of racks that +/// the ntp is replicated to, we try to schedule a move. For each rack we +/// arbitrarily choose the first appearing replica to remain there (note: this +/// is probably not optimal choice). +void partition_balancer_planner::get_rack_constraint_repair_reassignments( + plan_data& result, reallocation_request_state& rrs) { + if (_state.ntps_with_broken_rack_constraint().empty()) { + return; + } + + absl::flat_hash_set available_racks; + for (auto node_id : rrs.all_nodes) { + if (!rrs.timed_out_unavailable_nodes.contains(node_id)) { + auto rack = _partition_allocator.state().get_rack_id(node_id); + if (rack) { + available_racks.insert(*rack); + } + } + } + + for (const auto& ntp : _state.ntps_with_broken_rack_constraint()) { + if (rrs.planned_moves_size >= _config.movement_disk_size_batch) { + return; + } + + if (rrs.moving_partitions.contains(ntp)) { + continue; + } + + auto assignment = _state.topics().get_partition_assignment(ntp); + if (!assignment) { + vlog(clusterlog.warn, "assignment for ntp {} not found", ntp); + continue; + } + + const auto& orig_replicas = assignment->replicas; + + std::vector stable_replicas; + absl::flat_hash_set cur_racks; + for (const auto& bs : orig_replicas) { + auto rack = _partition_allocator.state().get_rack_id(bs.node_id); + if (rack) { + auto [it, inserted] = cur_racks.insert(*rack); + if (inserted) { + stable_replicas.push_back(bs); + } + } else { + stable_replicas.push_back(bs); + } + } + + if (stable_replicas.size() == orig_replicas.size()) { + continue; + } + + if (available_racks.size() <= cur_racks.size()) { + // Can't repair the constraint if we don't have an available rack to + // place a replica there. + continue; + } + + auto partition_size = get_partition_size(ntp, rrs); + if ( + !partition_size.has_value() + || !is_partition_movement_possible(orig_replicas, rrs)) { + result.failed_reassignments_count += 1; + continue; + } + + auto constraints = get_partition_constraints( + *assignment, + partition_size.value(), + _config.hard_max_disk_usage_ratio, + rrs); + + auto new_allocation_units = get_reallocation( + ntp, + *assignment, + partition_size.value(), + std::move(constraints), + stable_replicas, + rrs); + if (new_allocation_units) { + result.reassignments.emplace_back(ntp_reassignments{ + .ntp = ntp, + .allocation_units = std::move(new_allocation_units.value())}); + } else { + result.failed_reassignments_count += 1; + } + } +} + /* * Function is trying to move ntps out of node that are violating * soft_max_disk_usage_ratio. It takes nodes in reverse used space ratio order. @@ -592,9 +690,12 @@ partition_balancer_planner::plan_reassignments( return result; } - if (!result.violations.is_empty()) { + if ( + !result.violations.is_empty() + || !_state.ntps_with_broken_rack_constraint().empty()) { init_ntp_sizes_from_health_report(health_report, rrs); get_unavailable_nodes_reassignments(result, rrs); + get_rack_constraint_repair_reassignments(result, rrs); get_full_node_reassignments(result, rrs); if (!result.reassignments.empty()) { result.status = status::movement_planned; diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index a0c6d67eb35b..5a0b8fbdf74f 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -95,6 +95,9 @@ class partition_balancer_planner { void get_unavailable_nodes_reassignments( plan_data&, reallocation_request_state&); + void get_rack_constraint_repair_reassignments( + plan_data&, reallocation_request_state&); + void get_full_node_reassignments(plan_data&, reallocation_request_state&); void init_per_node_state( From 7c35407fc87cff74bc9772072500db60f5849ca5 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 20 Oct 2022 14:19:23 +0300 Subject: [PATCH 05/10] ut/partition_balancer: easier creation of nodes with defined rack ids --- .../tests/partition_balancer_planner_fixture.h | 16 ++++++++++++++-- .../tests/partition_balancer_planner_test.cc | 7 ++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index e78a1f2ec525..2b7ef4dea0b4 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -136,8 +136,15 @@ struct partition_balancer_planner_fixture { } void allocator_register_nodes( - size_t nodes_amount, - const std::optional& rack_id = std::nullopt) { + size_t nodes_amount, const std::vector& rack_ids = {}) { + if (!rack_ids.empty()) { + vassert( + rack_ids.size() == nodes_amount, + "mismatch between rack ids: {} and the number of new nodes: {}", + rack_ids, + nodes_amount); + } + auto& members_table = workers.members.local(); std::vector new_brokers; @@ -146,6 +153,11 @@ struct partition_balancer_planner_fixture { } for (size_t i = 0; i < nodes_amount; ++i) { + std::optional rack_id; + if (!rack_ids.empty()) { + rack_id = model::rack_id{rack_ids[i]}; + } + workers.allocator.local().register_node(create_allocation_node( model::node_id(last_node_idx), 4, rack_id)); new_brokers.push_back(model::broker( diff --git a/src/v/cluster/tests/partition_balancer_planner_test.cc b/src/v/cluster/tests/partition_balancer_planner_test.cc index 3f737c3391bf..9151ccc65d8c 100644 --- a/src/v/cluster/tests/partition_balancer_planner_test.cc +++ b/src/v/cluster/tests/partition_balancer_planner_test.cc @@ -699,12 +699,9 @@ FIXTURE_TEST(test_node_cancelation, partition_balancer_planner_fixture) { */ FIXTURE_TEST(test_rack_awareness, partition_balancer_planner_fixture) { vlog(logger.debug, "test_rack_awareness"); - allocator_register_nodes(1, model::rack_id("rack_1")); - allocator_register_nodes(1, model::rack_id("rack_2")); - allocator_register_nodes(1, model::rack_id("rack_3")); + allocator_register_nodes(3, {"rack_1", "rack_2", "rack_3"}); create_topic("topic-1", 1, 3); - allocator_register_nodes(1, model::rack_id("rack_3")); - allocator_register_nodes(1, model::rack_id("rack_4")); + allocator_register_nodes(2, {"rack_3", "rack_4"}); auto hr = create_health_report(); // Make node_4 disk free size less to make partition allocator disk usage From 34aea6b69289aac1a3aa68ea43a8e35cf9421382 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Thu, 20 Oct 2022 14:44:47 +0300 Subject: [PATCH 06/10] ut/partition_balancer: add rack awareness repair unit test --- .../tests/partition_balancer_planner_test.cc | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/v/cluster/tests/partition_balancer_planner_test.cc b/src/v/cluster/tests/partition_balancer_planner_test.cc index 9151ccc65d8c..5c7df1673922 100644 --- a/src/v/cluster/tests/partition_balancer_planner_test.cc +++ b/src/v/cluster/tests/partition_balancer_planner_test.cc @@ -820,3 +820,42 @@ FIXTURE_TEST( BOOST_REQUIRE_EQUAL(plan_data.cancellations.size(), 0); BOOST_REQUIRE_EQUAL(plan_data.failed_reassignments_count, 1); } + +/* + * 4 nodes; 1 topic; 2 partitions with 3 replicas in 2 racks; + * Planner should repair the rack awareness constraint. + * node_0: partitions: 2; rack: rack_A; + * node_1: partitions: 2; rack: rack_B; + * node_2: partitions: 2; rack: rack_B; + * node_3: partitions: 0; rack: rack_C; + */ +FIXTURE_TEST(test_rack_awareness_repair, partition_balancer_planner_fixture) { + allocator_register_nodes(3, {"rack_A", "rack_B", "rack_B"}); + // Partitions will be created with rack constraint violated (because there + // are only 2 racks) + create_topic("topic-1", 2, 3); + allocator_register_nodes(1, {"rack_C"}); + + auto hr = create_health_report(); + auto fm = create_follower_metrics({}); + + auto plan_data = planner.plan_reassignments(hr, fm); + + check_violations(plan_data, {}, {}); + BOOST_REQUIRE_EQUAL(plan_data.reassignments.size(), 2); + for (const auto& ras : plan_data.reassignments) { + const auto& new_replicas + = ras.allocation_units.get_assignments().front().replicas; + BOOST_REQUIRE_EQUAL(new_replicas.size(), 3); + absl::node_hash_set racks; + for (const auto& bs : new_replicas) { + auto rack = workers.allocator.local().state().get_rack_id( + bs.node_id); + BOOST_REQUIRE(rack); + racks.insert(*rack); + } + BOOST_REQUIRE_EQUAL(racks.size(), 3); + } + BOOST_REQUIRE_EQUAL(plan_data.cancellations.size(), 0); + BOOST_REQUIRE_EQUAL(plan_data.failed_reassignments_count, 0); +} From 4fb4ba51279d2ac563f879a3491155bd9c37045c Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 14 Oct 2022 20:43:43 +0300 Subject: [PATCH 07/10] tests: add PartitionBalancerTest.test_rack_constraint_repair --- tests/rptest/tests/partition_balancer_test.py | 63 ++++++++++++++++--- 1 file changed, 54 insertions(+), 9 deletions(-) diff --git a/tests/rptest/tests/partition_balancer_test.py b/tests/rptest/tests/partition_balancer_test.py index 370e1ca77b5e..71512f93c8f0 100644 --- a/tests/rptest/tests/partition_balancer_test.py +++ b/tests/rptest/tests/partition_balancer_test.py @@ -339,6 +339,13 @@ def test_movement_cancellations(self): self.run_validation(consumer_timeout_sec=CONSUMER_TIMEOUT) + def check_rack_placement(self, topic, rack_layout): + for p in self.topic_partitions_replicas(topic): + racks = {rack_layout[r - 1] for r in p.replicas} + assert ( + len(racks) == 3 + ), f"bad rack placement {racks} for partition id: {p.id} (replicas: {p.replicas})" + @cluster(num_nodes=8, log_allow_list=CHAOS_LOG_ALLOW_LIST) def test_rack_awareness(self): extra_rp_conf = self._extra_rp_conf | {"enable_rack_awareness": True} @@ -356,14 +363,7 @@ def test_rack_awareness(self): self.topic = TopicSpec(partition_count=random.randint(20, 30)) self.client().create_topic(self.topic) - def check_rack_placement(): - for p in self.topic_partitions_replicas(self.topic): - racks = {(r - 1) // 2 for r in p.replicas} - assert ( - len(racks) == 3 - ), f"bad rack placement {racks} for partition id: {p.id} (replicas: {p.replicas})" - - check_rack_placement() + self.check_rack_placement(self.topic, rack_layout) self.start_producer(1) self.start_consumer(1) @@ -375,11 +375,56 @@ def check_rack_placement(): ns.make_unavailable(node, wait_for_previous_node=True) self.wait_until_ready(expected_unavailable_node=node) self.check_no_replicas_on_node(node) - check_rack_placement() + self.check_rack_placement(self.topic, rack_layout) ns.make_available() self.run_validation(consumer_timeout_sec=CONSUMER_TIMEOUT) + @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) + def test_rack_constraint_repair(self): + """ + Test partition balancer rack constraint repair with the following scenario: + * kill a node, making the whole AZ unavailable. + * partition balancer will move partitions to remaining nodes, + causing rack awareness constraint to be violated + * start another node, making the number of available AZs enough to satisfy + rack constraint + * check that the balancer repaired the constraint + """ + + extra_rp_conf = self._extra_rp_conf | {"enable_rack_awareness": True} + self.redpanda = RedpandaService(self.test_context, + num_brokers=5, + extra_rp_conf=extra_rp_conf) + + rack_layout = "ABBCC" + for ix, node in enumerate(self.redpanda.nodes): + self.redpanda.set_extra_node_conf(node, {"rack": rack_layout[ix]}) + + self.redpanda.start(nodes=self.redpanda.nodes[0:4]) + self._client = DefaultClient(self.redpanda) + + self.topic = TopicSpec(partition_count=random.randint(20, 30)) + self.client().create_topic(self.topic) + + self.check_rack_placement(self.topic, rack_layout) + + self.start_producer(1) + self.start_consumer(1) + self.await_startup() + + with self.NodeStopper(self) as ns: + node = self.redpanda.nodes[3] + ns.make_unavailable(node) + self.wait_until_ready(expected_unavailable_node=node) + + self.redpanda.start_node(self.redpanda.nodes[4]) + + self.wait_until_ready(expected_unavailable_node=node) + self.check_rack_placement(self.topic, rack_layout) + + self.run_validation(consumer_timeout_sec=CONSUMER_TIMEOUT) + @cluster(num_nodes=7, log_allow_list=CHAOS_LOG_ALLOW_LIST) def test_fuzz_admin_ops(self): self.start_redpanda(num_nodes=5) From 6e41138edb66f4e109b5dd790dd3f48c99b14860 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 21 Oct 2022 15:59:32 +0300 Subject: [PATCH 08/10] c/partition_balancer: improve logging 1) don't log happy path of submitting reassignments/cancellations 2) move planned reassignments/cancellation logging to planner and add more context: previous replicas and reason. --- src/v/cluster/partition_balancer_backend.cc | 31 ++++++------- src/v/cluster/partition_balancer_planner.cc | 49 ++++++++++++++++----- src/v/cluster/partition_balancer_planner.h | 6 +++ 3 files changed, 59 insertions(+), 27 deletions(-) diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index 2e2754d1cfb5..546408dddd2b 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -199,18 +199,19 @@ ss::future<> partition_balancer_backend::do_tick() { co_await ss::max_concurrent_for_each( plan_data.cancellations, 32, [this, current_term](model::ntp& ntp) { - vlog(clusterlog.info, "cancel movement for ntp {}", ntp); return _topics_frontend .cancel_moving_partition_replicas( ntp, model::timeout_clock::now() + add_move_cmd_timeout, current_term) .then([ntp = std::move(ntp)](auto errc) { - vlog( - clusterlog.info, - "{} movement cancellation submitted, errc: {}", - ntp, - errc); + if (errc) { + vlog( + clusterlog.warn, + "submitting {} movement cancellation failed, error: {}", + ntp, + errc.message()); + } }); }); @@ -218,12 +219,6 @@ ss::future<> partition_balancer_backend::do_tick() { plan_data.reassignments, 32, [this, current_term](ntp_reassignments& reassignment) { - vlog( - clusterlog.info, - "moving {} to {}", - reassignment.ntp, - reassignment.allocation_units.get_assignments().front().replicas); - return _topics_frontend .move_partition_replicas( reassignment.ntp, @@ -231,11 +226,13 @@ ss::future<> partition_balancer_backend::do_tick() { model::timeout_clock::now() + add_move_cmd_timeout, current_term) .then([reassignment = std::move(reassignment)](auto errc) { - vlog( - clusterlog.info, - "{} reassignment submitted, errc: {}", - reassignment.ntp, - errc); + if (errc) { + vlog( + clusterlog.warn, + "submitting {} reassignment failed, error: {}", + reassignment.ntp, + errc.message()); + } }); }); } diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index 4df1f4c3f5d5..8a358425fa39 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -293,6 +293,23 @@ result partition_balancer_planner::get_reallocation( return reallocation; } +void partition_balancer_planner::plan_data::add_reassignment( + model::ntp ntp, + const std::vector& orig_replicas, + allocation_units allocation_units, + std::string_view reason) { + vlog( + clusterlog.info, + "ntp: {}, planning move {} -> {} (reason: {})", + ntp, + orig_replicas, + allocation_units.get_assignments().front().replicas, + reason); + + reassignments.emplace_back(ntp_reassignments{ + .ntp = ntp, .allocation_units = std::move(allocation_units)}); +} + /* * Function is trying to move ntp out of unavailable nodes * It can move to nodes that are violating soft_max_disk_usage_ratio constraint @@ -348,9 +365,11 @@ void partition_balancer_planner::get_unavailable_nodes_reassignments( stable_replicas, rrs); if (new_allocation_units) { - result.reassignments.emplace_back(ntp_reassignments{ - .ntp = ntp, - .allocation_units = std::move(new_allocation_units.value())}); + result.add_reassignment( + ntp, + a.replicas, + std::move(new_allocation_units.value()), + "unavailable nodes"); } else { result.failed_reassignments_count += 1; } @@ -447,9 +466,11 @@ void partition_balancer_planner::get_rack_constraint_repair_reassignments( stable_replicas, rrs); if (new_allocation_units) { - result.reassignments.emplace_back(ntp_reassignments{ - .ntp = ntp, - .allocation_units = std::move(new_allocation_units.value())}); + result.add_reassignment( + ntp, + orig_replicas, + std::move(new_allocation_units.value()), + "rack constraint repair"); } else { result.failed_reassignments_count += 1; } @@ -600,10 +621,11 @@ void partition_balancer_planner::get_full_node_reassignments( rrs); if (new_allocation_units) { - result.reassignments.emplace_back(ntp_reassignments{ - .ntp = partition_to_move, - .allocation_units = std::move( - new_allocation_units.value())}); + result.add_reassignment( + partition_to_move, + current_assignments->replicas, + std::move(new_allocation_units.value()), + "full nodes"); success = true; break; } else { @@ -651,6 +673,13 @@ void partition_balancer_planner::get_unavailable_node_movement_cancellations( rrs.timed_out_unavailable_nodes.contains(r.node_id) && !previous_replicas_set.contains(r.node_id)) { if (!was_on_decommissioning_node) { + vlog( + clusterlog.info, + "ntp: {}, cancelling move {} -> {}", + update.first, + update.second.get_previous_replicas(), + current_assignments->replicas); + result.cancellations.push_back(update.first); } else { result.failed_reassignments_count += 1; diff --git a/src/v/cluster/partition_balancer_planner.h b/src/v/cluster/partition_balancer_planner.h index 5a0b8fbdf74f..3de75e4030c1 100644 --- a/src/v/cluster/partition_balancer_planner.h +++ b/src/v/cluster/partition_balancer_planner.h @@ -57,6 +57,12 @@ class partition_balancer_planner { std::vector cancellations; size_t failed_reassignments_count = 0; status status = status::empty; + + void add_reassignment( + model::ntp, + const std::vector& orig_replicas, + allocation_units, + std::string_view reason); }; plan_data plan_reassignments( From cebb295ff7c7f69186c84eac5e4fe6ee94172e1a Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 21 Oct 2022 19:01:55 +0300 Subject: [PATCH 09/10] c/partition_balancer: partition_balancer_state unit tests --- .../partition_balancer_planner_fixture.h | 91 ++++++++++++++++--- .../tests/partition_balancer_planner_test.cc | 42 +++++++++ 2 files changed, 118 insertions(+), 15 deletions(-) diff --git a/src/v/cluster/tests/partition_balancer_planner_fixture.h b/src/v/cluster/tests/partition_balancer_planner_fixture.h index 2b7ef4dea0b4..d52808b14f95 100644 --- a/src/v/cluster/tests/partition_balancer_planner_fixture.h +++ b/src/v/cluster/tests/partition_balancer_planner_fixture.h @@ -17,6 +17,7 @@ #include "cluster/tests/utils.h" #include "cluster/topic_updates_dispatcher.h" #include "model/metadata.h" +#include "random/generators.h" #include "test_utils/fixture.h" #include "units.h" @@ -115,26 +116,56 @@ struct partition_balancer_planner_fixture { return {cfg, std::move(pas)}; } - cluster::create_topic_cmd make_create_topic_cmd( - const ss::sstring& name, int partitions, int16_t replication_factor) { - return { - make_tp_ns(name), - make_tp_configuration(name, partitions, replication_factor)}; - } - model::topic_namespace make_tp_ns(const ss::sstring& tp) { return {test_ns, model::topic(tp)}; } - void create_topic( - const ss::sstring& name, int partitions, int16_t replication_factor) { - auto cmd = make_create_topic_cmd(name, partitions, replication_factor); + template + void dispatch_command(Cmd cmd) { auto res = workers.dispatcher .apply_update(serialize_cmd(std::move(cmd)).get()) .get(); BOOST_REQUIRE_EQUAL(res, cluster::errc::success); } + void create_topic( + const ss::sstring& name, int partitions, int16_t replication_factor) { + cluster::create_topic_cmd cmd{ + make_tp_ns(name), + make_tp_configuration(name, partitions, replication_factor)}; + dispatch_command(std::move(cmd)); + } + + void create_topic( + const ss::sstring& name, + std::vector> partition_nodes) { + BOOST_REQUIRE(!partition_nodes.empty()); + int16_t replication_factor = partition_nodes.front().size(); + cluster::topic_configuration cfg( + test_ns, + model::topic{name}, + partition_nodes.size(), + replication_factor); + + std::vector assignments; + for (size_t i = 0; i < partition_nodes.size(); ++i) { + const auto& nodes = partition_nodes[i]; + BOOST_REQUIRE_EQUAL(nodes.size(), replication_factor); + std::vector replicas; + for (model::node_id n : nodes) { + replicas.push_back(model::broker_shard{ + n, random_generators::get_int(0, 3)}); + } + assignments.push_back(cluster::partition_assignment{ + raft::group_id{1}, model::partition_id{i}, replicas}); + } + cluster::create_topic_cmd cmd{ + make_tp_ns(name), + cluster::topic_configuration_assignment{cfg, std::move(assignments)}}; + + dispatch_command(std::move(cmd)); + } + void allocator_register_nodes( size_t nodes_amount, const std::vector& rack_ids = {}) { if (!rack_ids.empty()) { @@ -184,11 +215,17 @@ struct partition_balancer_planner_fixture { void move_partition_replicas( const model::ntp& ntp, const std::vector& new_replicas) { - auto cmd = make_move_partition_replicas_cmd(ntp, new_replicas); - auto res = workers.dispatcher - .apply_update(serialize_cmd(std::move(cmd)).get()) - .get(); - BOOST_REQUIRE_EQUAL(res, cluster::errc::success); + dispatch_command(make_move_partition_replicas_cmd(ntp, new_replicas)); + } + + void move_partition_replicas( + model::ntp ntp, const std::vector& new_nodes) { + std::vector new_replicas; + for (auto n : new_nodes) { + new_replicas.push_back(model::broker_shard{ + n, random_generators::get_int(0, 3)}); + } + move_partition_replicas(std::move(ntp), std::move(new_replicas)); } void move_partition_replicas(cluster::ntp_reassignments& reassignment) { @@ -197,6 +234,30 @@ struct partition_balancer_planner_fixture { reassignment.allocation_units.get_assignments().front().replicas); } + void cancel_partition_move(model::ntp ntp) { + cluster::cancel_moving_partition_replicas_cmd cmd{ + std::move(ntp), + cluster::cancel_moving_partition_replicas_cmd_data{ + cluster::force_abort_update{false}}}; + dispatch_command(std::move(cmd)); + } + + void finish_partition_move(model::ntp ntp) { + auto cur_assignment = workers.table.local().get_partition_assignment( + ntp); + BOOST_REQUIRE(cur_assignment); + + cluster::finish_moving_partition_replicas_cmd cmd{ + std::move(ntp), cur_assignment->replicas}; + + dispatch_command(std::move(cmd)); + } + + void delete_topic(const model::topic& topic) { + cluster::delete_topic_cmd cmd{make_tp_ns(topic()), make_tp_ns(topic())}; + dispatch_command(std::move(cmd)); + } + std::vector create_follower_metrics(const std::set& unavailable_nodes = {}) { std::vector metrics; diff --git a/src/v/cluster/tests/partition_balancer_planner_test.cc b/src/v/cluster/tests/partition_balancer_planner_test.cc index 5c7df1673922..7e0459703734 100644 --- a/src/v/cluster/tests/partition_balancer_planner_test.cc +++ b/src/v/cluster/tests/partition_balancer_planner_test.cc @@ -14,6 +14,9 @@ static ss::logger logger("partition_balancer_planner"); +// a shorthand to avoid spelling out model::node_id +static model::node_id n(int64_t id) { return model::node_id{id}; }; + using namespace std::chrono_literals; void check_violations( @@ -821,6 +824,45 @@ FIXTURE_TEST( BOOST_REQUIRE_EQUAL(plan_data.failed_reassignments_count, 1); } +FIXTURE_TEST( + test_state_ntps_with_broken_rack_constraint, + partition_balancer_planner_fixture) { + allocator_register_nodes(4, {"rack_A", "rack_B", "rack_B", "rack_C"}); + + model::topic topic{"topic-1"}; + model::ntp ntp0{test_ns, topic, 0}; + model::ntp ntp1{test_ns, topic, 1}; + + auto check_ntps = [&](absl::btree_set expected) { + const auto& ntps + = workers.state.local().ntps_with_broken_rack_constraint(); + BOOST_REQUIRE_EQUAL( + std::vector(ntps.begin(), ntps.end()), + std::vector(expected.begin(), expected.end())); + }; + + create_topic(topic(), {{n(0), n(1), n(2)}, {n(0), n(1), n(3)}}); + check_ntps({ntp0}); + + move_partition_replicas(ntp1, {n(0), n(1), n(2)}); + check_ntps({ntp0, ntp1}); + + move_partition_replicas(ntp0, {n(0), n(3), n(2)}); + check_ntps({ntp1}); + + cancel_partition_move(ntp0); + check_ntps({ntp0, ntp1}); + + finish_partition_move(ntp1); + check_ntps({ntp0, ntp1}); + + move_partition_replicas(ntp1, {n(0), n(2), n(3)}); + check_ntps({ntp0}); + + delete_topic(topic); + check_ntps({}); +} + /* * 4 nodes; 1 topic; 2 partitions with 3 replicas in 2 racks; * Planner should repair the rack awareness constraint. From 04e041e399d669026f3dd1903c2b1cf89ff8864e Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 28 Oct 2022 01:59:40 +0300 Subject: [PATCH 10/10] c/partition_balancer: early return in planner main func --- src/v/cluster/partition_balancer_planner.cc | 23 ++++++++++++--------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/v/cluster/partition_balancer_planner.cc b/src/v/cluster/partition_balancer_planner.cc index 8a358425fa39..ee110775d596 100644 --- a/src/v/cluster/partition_balancer_planner.cc +++ b/src/v/cluster/partition_balancer_planner.cc @@ -720,19 +720,22 @@ partition_balancer_planner::plan_reassignments( } if ( - !result.violations.is_empty() - || !_state.ntps_with_broken_rack_constraint().empty()) { - init_ntp_sizes_from_health_report(health_report, rrs); - get_unavailable_nodes_reassignments(result, rrs); - get_rack_constraint_repair_reassignments(result, rrs); - get_full_node_reassignments(result, rrs); - if (!result.reassignments.empty()) { - result.status = status::movement_planned; - } - + result.violations.is_empty() + && _state.ntps_with_broken_rack_constraint().empty()) { + result.status = status::empty; return result; } + init_ntp_sizes_from_health_report(health_report, rrs); + + get_unavailable_nodes_reassignments(result, rrs); + get_rack_constraint_repair_reassignments(result, rrs); + get_full_node_reassignments(result, rrs); + + if (!result.reassignments.empty()) { + result.status = status::movement_planned; + } + return result; }