Skip to content

Commit

Permalink
Merge pull request #9901 from vbotbuildovich/backport-fixes-to-v23.1.…
Browse files Browse the repository at this point in the history
…x-340

[v23.1.x] Topic-aware leadership balancing
  • Loading branch information
piyushredpanda authored Apr 11, 2023
2 parents c97c219 + 10a2dc9 commit ef8e659
Show file tree
Hide file tree
Showing 24 changed files with 2,260 additions and 243 deletions.
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ v_cc_library(
scheduling/constraints.cc
scheduling/leader_balancer.cc
scheduling/leader_balancer_probe.cc
scheduling/leader_balancer_constraints.cc
health_monitor_types.cc
health_monitor_backend.cc
health_monitor_frontend.cc
Expand Down
216 changes: 155 additions & 61 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
#include "cluster/members_table.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/scheduling/leader_balancer_greedy.h"
#include "cluster/scheduling/leader_balancer_random.h"
#include "cluster/scheduling/leader_balancer_strategy.h"
#include "cluster/scheduling/leader_balancer_types.h"
#include "cluster/shard_table.h"
#include "cluster/topic_table.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "raft/rpc_client_protocol.h"
#include "random/generators.h"
Expand All @@ -26,6 +30,7 @@
#include "vlog.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/timer.hh>

Expand All @@ -36,6 +41,7 @@

#include <algorithm>
#include <chrono>
#include <memory>

namespace cluster {

Expand Down Expand Up @@ -275,8 +281,35 @@ void leader_balancer::trigger_balance() {
});
}

bool leader_balancer::should_stop_balance() const {
return !_enabled() || _as.local().abort_requested();
}

static bool validate_indexes(
const leader_balancer_types::group_id_to_topic_revision_t& group_to_topic,
const leader_balancer_types::index_type& index) {
// Ensure every group in the shard index has a
// topic mapping in the group_to_topic index.
// It's an implicit assumption of the even_topic_distributon_constraint
// that this is the case.
for (const auto& broker_shard : index) {
for (const auto& group_p : broker_shard.second) {
auto topic_id_opt = group_to_topic.find(group_p.first);
if (topic_id_opt == group_to_topic.end()) {
vlog(
clusterlog.warn,
"no topic mapping in group_to_topic index for group: {}",
group_p.first);
return false;
}
}
}

return true;
}

ss::future<ss::stop_iteration> leader_balancer::balance() {
if (!_enabled()) {
if (should_stop_balance()) {
co_return ss::stop_iteration::yes;
}

Expand Down Expand Up @@ -353,20 +386,37 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
_need_controller_refresh = false;
}

if (_as.local().abort_requested()) {
co_return ss::stop_iteration::yes;
auto index = build_index();
auto group_id_to_topic = build_group_id_to_topic_rev();

if (!validate_indexes(group_id_to_topic, index)) {
vlog(clusterlog.warn, "Leadership balancer tick: invalid indexes.");
co_return ss::stop_iteration::no;
}

/*
* For simplicity the current implementation rebuilds the index on each
* rebalancing tick. Testing shows that this takes up to a couple
* hundred microseconds for up to 1000s of raft groups. This can be
* optimized later by attempting to minimize the number of rebuilds
* (e.g. on average little should change between ticks) and bounding the
* search for leader moves.
*/
greedy_balanced_shards strategy(build_index(), muted_nodes());
auto cores = strategy.stats();
auto mode = config::shard_local_cfg().leader_balancer_mode();
std::unique_ptr<leader_balancer_strategy> strategy;

switch (mode) {
case model::leader_balancer_mode::random_hill_climbing:
vlog(clusterlog.debug, "using random_hill_climbing");
strategy = std::make_unique<
leader_balancer_types::random_hill_climbing_strategy>(
std::move(index),
std::move(group_id_to_topic),
leader_balancer_types::muted_index{muted_nodes(), {}});
break;
case model::leader_balancer_mode::greedy_balanced_shards:
vlog(clusterlog.debug, "using greedy_balanced_shards");
strategy = std::make_unique<greedy_balanced_shards>(
std::move(index), muted_nodes());
break;
default:
vlog(clusterlog.error, "unexpected mode value: {}", mode);
co_return ss::stop_iteration::no;
}

auto cores = strategy->stats();

if (clusterlog.is_enabled(ss::log_level::trace)) {
for (const auto& core : cores) {
Expand All @@ -389,14 +439,19 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::yes;
}

if (
_in_flight_changes.size() >= _transfer_limit_per_shard() * cores.size()) {
size_t allowed_change_cnt = 0;
size_t max_inflight_changes = _transfer_limit_per_shard() * cores.size();
if (_in_flight_changes.size() < max_inflight_changes) {
allowed_change_cnt = max_inflight_changes - _in_flight_changes.size();
}

if (allowed_change_cnt == 0) {
vlog(
clusterlog.debug,
"Leadership balancer tick: number of in flight changes is at max "
"allowable. Current in flight {}. Max allowable {}.",
_in_flight_changes.size(),
_transfer_limit_per_shard() * cores.size());
max_inflight_changes);

_throttled = true;

Expand All @@ -423,60 +478,71 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::yes;
}

auto error = strategy.error();
auto transfer = strategy.find_movement(muted_groups());
if (!transfer) {
vlog(
clusterlog.debug,
"No leadership balance improvements found with total delta {}, "
"number of muted groups {}",
error,
_muted.size());
if (!_timer.armed()) {
_timer.arm(_idle_timeout());
for (size_t i = 0; i < allowed_change_cnt; i++) {
if (should_stop_balance()) {
co_return ss::stop_iteration::yes;
}
_probe.leader_transfer_no_improvement();
co_return ss::stop_iteration::yes;
}

_in_flight_changes[transfer->group] = {
*transfer, clock_type::now() + _mute_timeout()};
check_register_leadership_change_notification();
auto transfer = strategy->find_movement(muted_groups());
if (!transfer) {
vlog(
clusterlog.debug,
"No leadership balance improvements found with total delta {}, "
"number of muted groups {}",
strategy->error(),
_muted.size());
if (!_timer.armed()) {
_timer.arm(_idle_timeout());
}
_probe.leader_transfer_no_improvement();
co_return ss::stop_iteration::yes;
}

auto success = co_await do_transfer(*transfer);
if (!success) {
vlog(
clusterlog.info,
"Error transferring leadership group {} from {} to {}",
transfer->group,
transfer->from,
transfer->to);
_in_flight_changes[transfer->group] = {
*transfer, clock_type::now() + _mute_timeout()};
check_register_leadership_change_notification();

_in_flight_changes.erase(transfer->group);
check_unregister_leadership_change_notification();
auto success = co_await do_transfer(*transfer);
if (!success) {
vlog(
clusterlog.info,
"Error transferring leadership group {} from {} to {}",
transfer->group,
transfer->from,
transfer->to);

_in_flight_changes.erase(transfer->group);
check_unregister_leadership_change_notification();

/*
* a common scenario is that a node loses all its leadership (e.g.
* restarts) and then it is recognized as having lots of extra
* capacity (which it does). but the balancer doesn't consider node
* health when making decisions. so when we fail transfer we inject
* a short delay to avoid spinning on sending transfer requests to a
* failed node. of course failure can happen for other reasons, so
* don't delay a lot.
*/
_probe.leader_transfer_error();
co_await ss::sleep_abortable(5s, _as.local());
co_return ss::stop_iteration::no;

} else {
_probe.leader_transfer_succeeded();
strategy->apply_movement(*transfer);
}

/*
* a common scenario is that a node loses all its leadership (e.g.
* restarts) and then it is recognized as having lots of extra capacity
* (which it does). but the balancer doesn't consider node health when
* making decisions. so when we fail transfer we inject a short delay
* to avoid spinning on sending transfer requests to a failed node. of
* course failure can happen for other reasons, so don't delay a lot.
* if leadership moved, or it timed out we'll mute the group for a while
* and continue to avoid any thrashing. notice that we don't check for
* movement to the exact shard we requested. this is because we want to
* avoid thrashing (we'll still mute the group), but also because we may
* have simply been racing with organic leadership movement.
*/
_probe.leader_transfer_error();
co_await ss::sleep_abortable(5s, _as.local());
_muted.try_emplace(
transfer->group, clock_type::now() + _mute_timeout());
}

_probe.leader_transfer_succeeded();

/*
* if leadership moved, or it timed out we'll mute the group for a while and
* continue to avoid any thrashing. notice that we don't check for movement
* to the exact shard we requested. this is because we want to avoid
* thrashing (we'll still mute the group), but also because we may have
* simply been racing with organic leadership movement.
*/
_muted.try_emplace(transfer->group, clock_type::now() + _mute_timeout());
co_return ss::stop_iteration::no;
}

Expand Down Expand Up @@ -521,6 +587,34 @@ absl::flat_hash_set<raft::group_id> leader_balancer::muted_groups() const {
return res;
}

leader_balancer_types::group_id_to_topic_revision_t
leader_balancer::build_group_id_to_topic_rev() const {
leader_balancer_types::group_id_to_topic_revision_t group_id_to_topic_rev;

// for each ntp in the cluster
for (const auto& topic : _topics.topics_map()) {
if (!topic.second.is_topic_replicable()) {
continue;
}

for (const auto& partition : topic.second.get_assignments()) {
if (partition.replicas.empty()) {
vlog(
clusterlog.warn,
"Leadership encountered partition with no partition "
"assignment: {}",
model::ntp(topic.first.ns, topic.first.tp, partition.id));
continue;
}

group_id_to_topic_rev.try_emplace(
partition.group, topic.second.get_revision());
}
}

return group_id_to_topic_rev;
}

/*
* builds an index that maps each core in the cluster to the set of replica
* groups such that the leader of each mapped replica group is on the given
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/scheduling/leader_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/partition_manager.h"
#include "cluster/scheduling/leader_balancer_probe.h"
#include "cluster/scheduling/leader_balancer_strategy.h"
#include "cluster/scheduling/leader_balancer_types.h"
#include "cluster/types.h"
#include "raft/consensus.h"
#include "raft/consensus_client_protocol.h"
Expand Down Expand Up @@ -92,6 +93,8 @@ class leader_balancer {
using index_type = leader_balancer_strategy::index_type;
using reassignment = leader_balancer_strategy::reassignment;

leader_balancer_types::group_id_to_topic_revision_t
build_group_id_to_topic_rev() const;
index_type build_index();
absl::flat_hash_set<raft::group_id> muted_groups() const;
absl::flat_hash_set<model::node_id> muted_nodes() const;
Expand All @@ -117,6 +120,8 @@ class leader_balancer {
void trigger_balance();
ss::future<ss::stop_iteration> balance();

bool should_stop_balance() const;

// On/off switch: when off, leader balancer tick will run
// but do nothing
config::binding<bool> _enabled;
Expand Down
Loading

0 comments on commit ef8e659

Please sign in to comment.