Skip to content

Commit

Permalink
c/members_backend: use reservoir sampling to chose replicas to move
Browse files Browse the repository at this point in the history
Using reservoir sampling to chose the replicas that are going to be
reallocated to the new node provides better uniformity of topic replicas
distribution across the cluster. Additionally reservoir sampling
algorithm isn't prone to not selecting enough reallocations.

Signed-off-by: Michal Maslanka <michal@redpanda.com>
  • Loading branch information
mmaslankaprv committed Mar 23, 2023
1 parent 8000bc5 commit 969a8be
Showing 1 changed file with 151 additions and 110 deletions.
261 changes: 151 additions & 110 deletions src/v/cluster/members_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,6 @@ void reassign_replicas(
partition_allocator& allocator,
partition_assignment& current_assignment,
members_backend::partition_reallocation& reallocation) {
vlog(
clusterlog.debug,
"[ntp: {}, {} -> -] trying to reassign partition replicas",
reallocation.ntp,
current_assignment.replicas);

// remove nodes that are going to be reassigned from current assignment.
std::erase_if(
current_assignment.replicas,
Expand Down Expand Up @@ -179,10 +173,6 @@ void reassign_replicas(
**/
unevenness_error_info calculate_unevenness_error(
const partition_allocator& allocator, partition_allocation_domain domain) {
static const std::vector<partition_allocation_domain> domains{
partition_allocation_domains::consumer_offsets,
partition_allocation_domains::common};

const auto node_cnt = allocator.state().available_nodes();

const auto node_replicas = calculate_replicas_per_node(allocator, domain);
Expand Down Expand Up @@ -210,10 +200,12 @@ unevenness_error_info calculate_unevenness_error(

vlog(
clusterlog.trace,
"node {} has {} replicas allocated, requested replicas per node "
"node {} has {} replicas allocated in domain {}, requested replicas "
"per node "
"{}, difference: {}",
id,
allocation_info.allocated_replicas,
domain,
target_replicas_per_node,
diff);
err += std::abs(diff);
Expand Down Expand Up @@ -466,27 +458,20 @@ void members_backend::default_reallocation_strategy::
max_batch_size, allocator, topics, meta, domain);
auto current_error = calculate_unevenness_error(allocator, domain);
auto [it, _] = meta.last_unevenness_error.try_emplace(domain, 1.0);
const auto min_improvement
= std::max<size_t>(
(meta.partition_reallocations.size() - prev_reallocations_count) / 10,
1)
* current_error.e_step;

auto improvement = it->second - current_error.e;
vlog(
clusterlog.info,
"[update: {}] unevenness error: {}, previous error: {}, "
"improvement: {}, min improvement: {}",
"[update: {}] unevenness error: {}, previous error: {}, improvement: {}",
meta.update,
current_error.e,
it->second,
improvement,
min_improvement);
improvement);

it->second = current_error.e;

// drop all reallocations if there is no improvement
if (improvement < min_improvement) {
if (improvement < 0) {
meta.partition_reallocations.erase(
std::next(
meta.partition_reallocations.begin(),
Expand Down Expand Up @@ -572,6 +557,34 @@ bool is_reassigned_to_node(
node_id);
}

double error_after_move(
size_t total_replicas,
const node_replicas_map_t& replicas_per_node,
const partition_allocator& allocator,
const std::vector<model::broker_shard>& new_replica_set,
const std::vector<model::broker_shard>& old_replica_set) {
const auto total_nodes = allocator.state().available_nodes();

auto added_nodes = subtract_replica_sets(new_replica_set, old_replica_set);
auto removed_nodes = subtract_replica_sets(
old_replica_set, new_replica_set);
auto target_replicas_per_node = total_replicas / total_nodes;

double err = 0;

for (auto& [id, allocation_info] : replicas_per_node) {
double diff = static_cast<double>(target_replicas_per_node)
- static_cast<double>(
allocation_info.allocated_replicas
+ contains_node(added_nodes, id)
- contains_node(removed_nodes, id));

err += std::abs(diff);
}
err /= (static_cast<double>(total_replicas) * total_nodes);
return err;
}

void members_backend::default_reallocation_strategy::
calculate_reallocations_batch(
size_t max_batch_size,
Expand Down Expand Up @@ -603,19 +616,6 @@ void members_backend::default_reallocation_strategy::
to_move_from_node.emplace_back(id, to_move);
}

auto all_empty = std::all_of(
to_move_from_node.begin(),
to_move_from_node.end(),
[](const replicas_to_move& m) { return m.left_to_move == 0; });
// nothing to do, exit early
if (all_empty) {
return;
}

auto cmp = [](const replicas_to_move& lhs, const replicas_to_move& rhs) {
return lhs.left_to_move < rhs.left_to_move;
};

if (clusterlog.is_enabled(ss::log_level::info)) {
for (const auto& [id, cnt] : to_move_from_node) {
vlog(
Expand All @@ -629,105 +629,146 @@ void members_backend::default_reallocation_strategy::
node_replicas[id].allocated_replicas);
}
}
auto [idx_it, _] = meta.last_ntp_index.try_emplace(domain, 0);
size_t current_idx = 0;
// 4. Pass over all partition metadata
for (auto& [tp_ns, metadata] : topics.topics_map()) {
// skip partitions outside of current domain
if (get_allocation_domain(tp_ns) != domain) {
continue;
}
// do not try to move internal partitions
if (
tp_ns.ns == model::kafka_internal_namespace
|| tp_ns.ns == model::redpanda_ns) {
continue;
}
if (!metadata.is_topic_replicable()) {
continue;

static const auto cmp =
[](const replicas_to_move& lhs, const replicas_to_move& rhs) {
return lhs.left_to_move < rhs.left_to_move;
};

auto initial_error = error_after_move(
total_replicas, node_replicas, allocator, {}, {});

std::erase_if(to_move_from_node, [](const replicas_to_move& to_move) {
return to_move.left_to_move <= 0;
});

std::sort(to_move_from_node.begin(), to_move_from_node.end(), cmp);

if (to_move_from_node.empty()) {
return;
}

auto to_move_it = to_move_from_node.begin();
while (meta.partition_reallocations.empty()
&& to_move_it != to_move_from_node.end()) {
auto& to_move = *to_move_it;

size_t effective_batch_size = std::min<size_t>(
to_move.left_to_move,
max_batch_size - meta.partition_reallocations.size());

if (effective_batch_size <= 0) {
return;
}
// do not move topics that were created after node was added, they are
// allocated with new cluster capacity
if (meta.update) {
if (metadata.get_revision() > meta.update->offset()) {
vlog(
clusterlog.debug,
"skipping reallocating topic {}, its revision {} is greater "
"than "
"node update {}",
tp_ns,
metadata.get_revision(),
meta.update->offset);

meta.partition_reallocations.reserve(effective_batch_size);
size_t idx = 0;
for (auto& [tp_ns, metadata] : topics.topics_map()) {
// skip partitions outside of current domain
if (get_allocation_domain(tp_ns) != domain) {
continue;
}
}
size_t rand_idx = 0;
/**
* We use 64 bit field initialized with random number to sample topic
* partitions to move, every time the bitfield is exhausted we
* reinitialize it and reset counter. This way we chose random
* partitions to move without iterating multiple times over the topic
* set
*/
std::bitset<sizeof(uint64_t)> sampling_bytes;
for (const auto& p : metadata.get_assignments()) {
if (idx_it->second > current_idx++) {
// do not try to move internal partitions
if (
tp_ns.ns == model::kafka_internal_namespace
|| tp_ns.ns == model::redpanda_ns) {
continue;
}

if (rand_idx % sizeof(uint64_t) == 0) {
sampling_bytes = random_generators::get_int(
std::numeric_limits<uint64_t>::max());
rand_idx = 0;
}

idx_it->second++;

if (sampling_bytes.test(rand_idx++)) {
if (!metadata.is_topic_replicable()) {
continue;
}
// do not move topics that were created after node was added, they
// are allocated with new cluster capacity
if (meta.update) {
if (metadata.get_revision() > meta.update->offset()) {
vlog(
clusterlog.debug,
"skipping reallocating topic {}, its revision {} is "
"greater than node update {}",
tp_ns,
metadata.get_revision(),
meta.update->offset);
continue;
}
}

std::erase_if(to_move_from_node, [](const replicas_to_move& v) {
return v.left_to_move == 0;
});

std::sort(to_move_from_node.begin(), to_move_from_node.end(), cmp);
for (auto& to_move : to_move_from_node) {
if (
is_in_replica_set(p.replicas, to_move.id)
&& to_move.left_to_move > 0) {
for (const auto& p : metadata.get_assignments()) {
if (is_in_replica_set(p.replicas, to_move.id)) {
partition_reallocation reallocation(
model::ntp(tp_ns.ns, tp_ns.tp, p.id), p.replicas.size());
reallocation.replicas_to_remove.emplace(to_move.id);
auto current_assignment = p;
reassign_replicas(
allocator, current_assignment, reallocation);
// skip if partition was reassigned to the same node
if (is_reassigned_to_node(reallocation, to_move.id)) {

if (!reallocation.allocation_units.has_value()) {
continue;
}
const auto& new_assignment = reallocation.allocation_units
.value()
.get_assignments()
.begin()
->replicas;

auto new_error = error_after_move(
total_replicas,
node_replicas,
allocator,
new_assignment,
p.replicas);
vlog(
clusterlog.trace,
"ntp {} move from {} -> {}, error before {}, error "
"after: {}",
reallocation.ntp,
p.replicas,
new_assignment,
initial_error,
new_error);
/**
* If there is no error improvement, skip this reallocation
*/
if (new_error >= initial_error) {
continue;
}
idx++;
reallocation.current_replica_set = p.replicas;
reallocation.state = reallocation_state::reassigned;
meta.partition_reallocations.push_back(
std::move(reallocation));
to_move.left_to_move--;
// reached max concurrent reallocations, yield
if (meta.partition_reallocations.size() >= max_batch_size) {
vlog(
clusterlog.info,
"reached limit of max concurrent reallocations: {}",
meta.partition_reallocations.size());
return;
/**
* Reservoir sampling part, after we have at least required
* number of moves replace one of the reallocations with
* decreasing probability.
*/
if (
meta.partition_reallocations.size()
< effective_batch_size) {
to_move.left_to_move--;
meta.partition_reallocations.push_back(
std::move(reallocation));
} else {
auto r_idx = random_generators::get_int<size_t>(
0, idx - 1);
if (r_idx < meta.partition_reallocations.size()) {
std::swap(
meta.partition_reallocations[r_idx],
reallocation);
}
}
break;
}
}
}
++to_move_it;
}
if (clusterlog.is_enabled(ss::log_level::debug)) {
for (auto& r : meta.partition_reallocations) {
vlog(
clusterlog.debug,
"{} moving {} -> {}",
r.ntp,
*r.replicas_to_remove.begin(),
subtract_replica_sets(r.new_replica_set, r.current_replica_set));
}
}
// reset after moving over all the partitions, the only case if the idx
// iterator is not reset is the case when we reached max concurrent
// reallocations.
idx_it->second = 0;
}

std::vector<model::ntp> members_backend::ntps_moving_from_node_older_than(
Expand Down

0 comments on commit 969a8be

Please sign in to comment.