Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v23.1.x] Topic-aware leadership balancing #9901

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cluster/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ v_cc_library(
scheduling/constraints.cc
scheduling/leader_balancer.cc
scheduling/leader_balancer_probe.cc
scheduling/leader_balancer_constraints.cc
health_monitor_types.cc
health_monitor_backend.cc
health_monitor_frontend.cc
Expand Down
216 changes: 155 additions & 61 deletions src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
#include "cluster/members_table.h"
#include "cluster/partition_leaders_table.h"
#include "cluster/scheduling/leader_balancer_greedy.h"
#include "cluster/scheduling/leader_balancer_random.h"
#include "cluster/scheduling/leader_balancer_strategy.h"
#include "cluster/scheduling/leader_balancer_types.h"
#include "cluster/shard_table.h"
#include "cluster/topic_table.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "raft/rpc_client_protocol.h"
#include "random/generators.h"
Expand All @@ -26,6 +30,7 @@
#include "vlog.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/loop.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/timer.hh>

Expand All @@ -36,6 +41,7 @@

#include <algorithm>
#include <chrono>
#include <memory>

namespace cluster {

Expand Down Expand Up @@ -275,8 +281,35 @@ void leader_balancer::trigger_balance() {
});
}

bool leader_balancer::should_stop_balance() const {
return !_enabled() || _as.local().abort_requested();
}

static bool validate_indexes(
const leader_balancer_types::group_id_to_topic_revision_t& group_to_topic,
const leader_balancer_types::index_type& index) {
// Ensure every group in the shard index has a
// topic mapping in the group_to_topic index.
// It's an implicit assumption of the even_topic_distributon_constraint
// that this is the case.
for (const auto& broker_shard : index) {
for (const auto& group_p : broker_shard.second) {
auto topic_id_opt = group_to_topic.find(group_p.first);
if (topic_id_opt == group_to_topic.end()) {
vlog(
clusterlog.warn,
"no topic mapping in group_to_topic index for group: {}",
group_p.first);
return false;
}
}
}

return true;
}

ss::future<ss::stop_iteration> leader_balancer::balance() {
if (!_enabled()) {
if (should_stop_balance()) {
co_return ss::stop_iteration::yes;
}

Expand Down Expand Up @@ -353,20 +386,37 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
_need_controller_refresh = false;
}

if (_as.local().abort_requested()) {
co_return ss::stop_iteration::yes;
auto index = build_index();
auto group_id_to_topic = build_group_id_to_topic_rev();

if (!validate_indexes(group_id_to_topic, index)) {
vlog(clusterlog.warn, "Leadership balancer tick: invalid indexes.");
co_return ss::stop_iteration::no;
}

/*
* For simplicity the current implementation rebuilds the index on each
* rebalancing tick. Testing shows that this takes up to a couple
* hundred microseconds for up to 1000s of raft groups. This can be
* optimized later by attempting to minimize the number of rebuilds
* (e.g. on average little should change between ticks) and bounding the
* search for leader moves.
*/
greedy_balanced_shards strategy(build_index(), muted_nodes());
auto cores = strategy.stats();
auto mode = config::shard_local_cfg().leader_balancer_mode();
std::unique_ptr<leader_balancer_strategy> strategy;

switch (mode) {
case model::leader_balancer_mode::random_hill_climbing:
vlog(clusterlog.debug, "using random_hill_climbing");
strategy = std::make_unique<
leader_balancer_types::random_hill_climbing_strategy>(
std::move(index),
std::move(group_id_to_topic),
leader_balancer_types::muted_index{muted_nodes(), {}});
break;
case model::leader_balancer_mode::greedy_balanced_shards:
vlog(clusterlog.debug, "using greedy_balanced_shards");
strategy = std::make_unique<greedy_balanced_shards>(
std::move(index), muted_nodes());
break;
default:
vlog(clusterlog.error, "unexpected mode value: {}", mode);
co_return ss::stop_iteration::no;
}

auto cores = strategy->stats();

if (clusterlog.is_enabled(ss::log_level::trace)) {
for (const auto& core : cores) {
Expand All @@ -389,14 +439,19 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::yes;
}

if (
_in_flight_changes.size() >= _transfer_limit_per_shard() * cores.size()) {
size_t allowed_change_cnt = 0;
size_t max_inflight_changes = _transfer_limit_per_shard() * cores.size();
if (_in_flight_changes.size() < max_inflight_changes) {
allowed_change_cnt = max_inflight_changes - _in_flight_changes.size();
}

if (allowed_change_cnt == 0) {
vlog(
clusterlog.debug,
"Leadership balancer tick: number of in flight changes is at max "
"allowable. Current in flight {}. Max allowable {}.",
_in_flight_changes.size(),
_transfer_limit_per_shard() * cores.size());
max_inflight_changes);

_throttled = true;

Expand All @@ -423,60 +478,71 @@ ss::future<ss::stop_iteration> leader_balancer::balance() {
co_return ss::stop_iteration::yes;
}

auto error = strategy.error();
auto transfer = strategy.find_movement(muted_groups());
if (!transfer) {
vlog(
clusterlog.debug,
"No leadership balance improvements found with total delta {}, "
"number of muted groups {}",
error,
_muted.size());
if (!_timer.armed()) {
_timer.arm(_idle_timeout());
for (size_t i = 0; i < allowed_change_cnt; i++) {
if (should_stop_balance()) {
co_return ss::stop_iteration::yes;
}
_probe.leader_transfer_no_improvement();
co_return ss::stop_iteration::yes;
}

_in_flight_changes[transfer->group] = {
*transfer, clock_type::now() + _mute_timeout()};
check_register_leadership_change_notification();
auto transfer = strategy->find_movement(muted_groups());
if (!transfer) {
vlog(
clusterlog.debug,
"No leadership balance improvements found with total delta {}, "
"number of muted groups {}",
strategy->error(),
_muted.size());
if (!_timer.armed()) {
_timer.arm(_idle_timeout());
}
_probe.leader_transfer_no_improvement();
co_return ss::stop_iteration::yes;
}

auto success = co_await do_transfer(*transfer);
if (!success) {
vlog(
clusterlog.info,
"Error transferring leadership group {} from {} to {}",
transfer->group,
transfer->from,
transfer->to);
_in_flight_changes[transfer->group] = {
*transfer, clock_type::now() + _mute_timeout()};
check_register_leadership_change_notification();

_in_flight_changes.erase(transfer->group);
check_unregister_leadership_change_notification();
auto success = co_await do_transfer(*transfer);
if (!success) {
vlog(
clusterlog.info,
"Error transferring leadership group {} from {} to {}",
transfer->group,
transfer->from,
transfer->to);

_in_flight_changes.erase(transfer->group);
check_unregister_leadership_change_notification();

/*
* a common scenario is that a node loses all its leadership (e.g.
* restarts) and then it is recognized as having lots of extra
* capacity (which it does). but the balancer doesn't consider node
* health when making decisions. so when we fail transfer we inject
* a short delay to avoid spinning on sending transfer requests to a
* failed node. of course failure can happen for other reasons, so
* don't delay a lot.
*/
_probe.leader_transfer_error();
co_await ss::sleep_abortable(5s, _as.local());
co_return ss::stop_iteration::no;

} else {
_probe.leader_transfer_succeeded();
strategy->apply_movement(*transfer);
}

/*
* a common scenario is that a node loses all its leadership (e.g.
* restarts) and then it is recognized as having lots of extra capacity
* (which it does). but the balancer doesn't consider node health when
* making decisions. so when we fail transfer we inject a short delay
* to avoid spinning on sending transfer requests to a failed node. of
* course failure can happen for other reasons, so don't delay a lot.
* if leadership moved, or it timed out we'll mute the group for a while
* and continue to avoid any thrashing. notice that we don't check for
* movement to the exact shard we requested. this is because we want to
* avoid thrashing (we'll still mute the group), but also because we may
* have simply been racing with organic leadership movement.
*/
_probe.leader_transfer_error();
co_await ss::sleep_abortable(5s, _as.local());
_muted.try_emplace(
transfer->group, clock_type::now() + _mute_timeout());
}

_probe.leader_transfer_succeeded();

/*
* if leadership moved, or it timed out we'll mute the group for a while and
* continue to avoid any thrashing. notice that we don't check for movement
* to the exact shard we requested. this is because we want to avoid
* thrashing (we'll still mute the group), but also because we may have
* simply been racing with organic leadership movement.
*/
_muted.try_emplace(transfer->group, clock_type::now() + _mute_timeout());
co_return ss::stop_iteration::no;
}

Expand Down Expand Up @@ -521,6 +587,34 @@ absl::flat_hash_set<raft::group_id> leader_balancer::muted_groups() const {
return res;
}

leader_balancer_types::group_id_to_topic_revision_t
leader_balancer::build_group_id_to_topic_rev() const {
leader_balancer_types::group_id_to_topic_revision_t group_id_to_topic_rev;

// for each ntp in the cluster
for (const auto& topic : _topics.topics_map()) {
if (!topic.second.is_topic_replicable()) {
continue;
}

for (const auto& partition : topic.second.get_assignments()) {
if (partition.replicas.empty()) {
vlog(
clusterlog.warn,
"Leadership encountered partition with no partition "
"assignment: {}",
model::ntp(topic.first.ns, topic.first.tp, partition.id));
continue;
}

group_id_to_topic_rev.try_emplace(
partition.group, topic.second.get_revision());
}
}

return group_id_to_topic_rev;
}

/*
* builds an index that maps each core in the cluster to the set of replica
* groups such that the leader of each mapped replica group is on the given
Expand Down
5 changes: 5 additions & 0 deletions src/v/cluster/scheduling/leader_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/partition_manager.h"
#include "cluster/scheduling/leader_balancer_probe.h"
#include "cluster/scheduling/leader_balancer_strategy.h"
#include "cluster/scheduling/leader_balancer_types.h"
#include "cluster/types.h"
#include "raft/consensus.h"
#include "raft/consensus_client_protocol.h"
Expand Down Expand Up @@ -92,6 +93,8 @@ class leader_balancer {
using index_type = leader_balancer_strategy::index_type;
using reassignment = leader_balancer_strategy::reassignment;

leader_balancer_types::group_id_to_topic_revision_t
build_group_id_to_topic_rev() const;
index_type build_index();
absl::flat_hash_set<raft::group_id> muted_groups() const;
absl::flat_hash_set<model::node_id> muted_nodes() const;
Expand All @@ -117,6 +120,8 @@ class leader_balancer {
void trigger_balance();
ss::future<ss::stop_iteration> balance();

bool should_stop_balance() const;

// On/off switch: when off, leader balancer tick will run
// but do nothing
config::binding<bool> _enabled;
Expand Down
Loading