From 969a8bead29ea96509f1b15ce518b12e7fd55fad Mon Sep 17 00:00:00 2001 From: Michal Maslanka Date: Wed, 22 Mar 2023 18:20:20 +0100 Subject: [PATCH] c/members_backend: use reservoir sampling to chose replicas to move Using reservoir sampling to chose the replicas that are going to be reallocated to the new node provides better uniformity of topic replicas distribution across the cluster. Additionally reservoir sampling algorithm isn't prone to not selecting enough reallocations. Signed-off-by: Michal Maslanka --- src/v/cluster/members_backend.cc | 261 ++++++++++++++++++------------- 1 file changed, 151 insertions(+), 110 deletions(-) diff --git a/src/v/cluster/members_backend.cc b/src/v/cluster/members_backend.cc index 38a2c1d6a0e5b..520c06c5bff25 100644 --- a/src/v/cluster/members_backend.cc +++ b/src/v/cluster/members_backend.cc @@ -90,12 +90,6 @@ void reassign_replicas( partition_allocator& allocator, partition_assignment& current_assignment, members_backend::partition_reallocation& reallocation) { - vlog( - clusterlog.debug, - "[ntp: {}, {} -> -] trying to reassign partition replicas", - reallocation.ntp, - current_assignment.replicas); - // remove nodes that are going to be reassigned from current assignment. std::erase_if( current_assignment.replicas, @@ -179,10 +173,6 @@ void reassign_replicas( **/ unevenness_error_info calculate_unevenness_error( const partition_allocator& allocator, partition_allocation_domain domain) { - static const std::vector domains{ - partition_allocation_domains::consumer_offsets, - partition_allocation_domains::common}; - const auto node_cnt = allocator.state().available_nodes(); const auto node_replicas = calculate_replicas_per_node(allocator, domain); @@ -210,10 +200,12 @@ unevenness_error_info calculate_unevenness_error( vlog( clusterlog.trace, - "node {} has {} replicas allocated, requested replicas per node " + "node {} has {} replicas allocated in domain {}, requested replicas " + "per node " "{}, difference: {}", id, allocation_info.allocated_replicas, + domain, target_replicas_per_node, diff); err += std::abs(diff); @@ -466,27 +458,20 @@ void members_backend::default_reallocation_strategy:: max_batch_size, allocator, topics, meta, domain); auto current_error = calculate_unevenness_error(allocator, domain); auto [it, _] = meta.last_unevenness_error.try_emplace(domain, 1.0); - const auto min_improvement - = std::max( - (meta.partition_reallocations.size() - prev_reallocations_count) / 10, - 1) - * current_error.e_step; auto improvement = it->second - current_error.e; vlog( clusterlog.info, - "[update: {}] unevenness error: {}, previous error: {}, " - "improvement: {}, min improvement: {}", + "[update: {}] unevenness error: {}, previous error: {}, improvement: {}", meta.update, current_error.e, it->second, - improvement, - min_improvement); + improvement); it->second = current_error.e; // drop all reallocations if there is no improvement - if (improvement < min_improvement) { + if (improvement < 0) { meta.partition_reallocations.erase( std::next( meta.partition_reallocations.begin(), @@ -572,6 +557,34 @@ bool is_reassigned_to_node( node_id); } +double error_after_move( + size_t total_replicas, + const node_replicas_map_t& replicas_per_node, + const partition_allocator& allocator, + const std::vector& new_replica_set, + const std::vector& old_replica_set) { + const auto total_nodes = allocator.state().available_nodes(); + + auto added_nodes = subtract_replica_sets(new_replica_set, old_replica_set); + auto removed_nodes = subtract_replica_sets( + old_replica_set, new_replica_set); + auto target_replicas_per_node = total_replicas / total_nodes; + + double err = 0; + + for (auto& [id, allocation_info] : replicas_per_node) { + double diff = static_cast(target_replicas_per_node) + - static_cast( + allocation_info.allocated_replicas + + contains_node(added_nodes, id) + - contains_node(removed_nodes, id)); + + err += std::abs(diff); + } + err /= (static_cast(total_replicas) * total_nodes); + return err; +} + void members_backend::default_reallocation_strategy:: calculate_reallocations_batch( size_t max_batch_size, @@ -603,19 +616,6 @@ void members_backend::default_reallocation_strategy:: to_move_from_node.emplace_back(id, to_move); } - auto all_empty = std::all_of( - to_move_from_node.begin(), - to_move_from_node.end(), - [](const replicas_to_move& m) { return m.left_to_move == 0; }); - // nothing to do, exit early - if (all_empty) { - return; - } - - auto cmp = [](const replicas_to_move& lhs, const replicas_to_move& rhs) { - return lhs.left_to_move < rhs.left_to_move; - }; - if (clusterlog.is_enabled(ss::log_level::info)) { for (const auto& [id, cnt] : to_move_from_node) { vlog( @@ -629,105 +629,146 @@ void members_backend::default_reallocation_strategy:: node_replicas[id].allocated_replicas); } } - auto [idx_it, _] = meta.last_ntp_index.try_emplace(domain, 0); - size_t current_idx = 0; - // 4. Pass over all partition metadata - for (auto& [tp_ns, metadata] : topics.topics_map()) { - // skip partitions outside of current domain - if (get_allocation_domain(tp_ns) != domain) { - continue; - } - // do not try to move internal partitions - if ( - tp_ns.ns == model::kafka_internal_namespace - || tp_ns.ns == model::redpanda_ns) { - continue; - } - if (!metadata.is_topic_replicable()) { - continue; + + static const auto cmp = + [](const replicas_to_move& lhs, const replicas_to_move& rhs) { + return lhs.left_to_move < rhs.left_to_move; + }; + + auto initial_error = error_after_move( + total_replicas, node_replicas, allocator, {}, {}); + + std::erase_if(to_move_from_node, [](const replicas_to_move& to_move) { + return to_move.left_to_move <= 0; + }); + + std::sort(to_move_from_node.begin(), to_move_from_node.end(), cmp); + + if (to_move_from_node.empty()) { + return; + } + + auto to_move_it = to_move_from_node.begin(); + while (meta.partition_reallocations.empty() + && to_move_it != to_move_from_node.end()) { + auto& to_move = *to_move_it; + + size_t effective_batch_size = std::min( + to_move.left_to_move, + max_batch_size - meta.partition_reallocations.size()); + + if (effective_batch_size <= 0) { + return; } - // do not move topics that were created after node was added, they are - // allocated with new cluster capacity - if (meta.update) { - if (metadata.get_revision() > meta.update->offset()) { - vlog( - clusterlog.debug, - "skipping reallocating topic {}, its revision {} is greater " - "than " - "node update {}", - tp_ns, - metadata.get_revision(), - meta.update->offset); + + meta.partition_reallocations.reserve(effective_batch_size); + size_t idx = 0; + for (auto& [tp_ns, metadata] : topics.topics_map()) { + // skip partitions outside of current domain + if (get_allocation_domain(tp_ns) != domain) { continue; } - } - size_t rand_idx = 0; - /** - * We use 64 bit field initialized with random number to sample topic - * partitions to move, every time the bitfield is exhausted we - * reinitialize it and reset counter. This way we chose random - * partitions to move without iterating multiple times over the topic - * set - */ - std::bitset sampling_bytes; - for (const auto& p : metadata.get_assignments()) { - if (idx_it->second > current_idx++) { + // do not try to move internal partitions + if ( + tp_ns.ns == model::kafka_internal_namespace + || tp_ns.ns == model::redpanda_ns) { continue; } - - if (rand_idx % sizeof(uint64_t) == 0) { - sampling_bytes = random_generators::get_int( - std::numeric_limits::max()); - rand_idx = 0; - } - - idx_it->second++; - - if (sampling_bytes.test(rand_idx++)) { + if (!metadata.is_topic_replicable()) { continue; } + // do not move topics that were created after node was added, they + // are allocated with new cluster capacity + if (meta.update) { + if (metadata.get_revision() > meta.update->offset()) { + vlog( + clusterlog.debug, + "skipping reallocating topic {}, its revision {} is " + "greater than node update {}", + tp_ns, + metadata.get_revision(), + meta.update->offset); + continue; + } + } - std::erase_if(to_move_from_node, [](const replicas_to_move& v) { - return v.left_to_move == 0; - }); - - std::sort(to_move_from_node.begin(), to_move_from_node.end(), cmp); - for (auto& to_move : to_move_from_node) { - if ( - is_in_replica_set(p.replicas, to_move.id) - && to_move.left_to_move > 0) { + for (const auto& p : metadata.get_assignments()) { + if (is_in_replica_set(p.replicas, to_move.id)) { partition_reallocation reallocation( model::ntp(tp_ns.ns, tp_ns.tp, p.id), p.replicas.size()); reallocation.replicas_to_remove.emplace(to_move.id); auto current_assignment = p; reassign_replicas( allocator, current_assignment, reallocation); - // skip if partition was reassigned to the same node - if (is_reassigned_to_node(reallocation, to_move.id)) { + + if (!reallocation.allocation_units.has_value()) { + continue; + } + const auto& new_assignment = reallocation.allocation_units + .value() + .get_assignments() + .begin() + ->replicas; + + auto new_error = error_after_move( + total_replicas, + node_replicas, + allocator, + new_assignment, + p.replicas); + vlog( + clusterlog.trace, + "ntp {} move from {} -> {}, error before {}, error " + "after: {}", + reallocation.ntp, + p.replicas, + new_assignment, + initial_error, + new_error); + /** + * If there is no error improvement, skip this reallocation + */ + if (new_error >= initial_error) { continue; } + idx++; reallocation.current_replica_set = p.replicas; reallocation.state = reallocation_state::reassigned; - meta.partition_reallocations.push_back( - std::move(reallocation)); - to_move.left_to_move--; - // reached max concurrent reallocations, yield - if (meta.partition_reallocations.size() >= max_batch_size) { - vlog( - clusterlog.info, - "reached limit of max concurrent reallocations: {}", - meta.partition_reallocations.size()); - return; + /** + * Reservoir sampling part, after we have at least required + * number of moves replace one of the reallocations with + * decreasing probability. + */ + if ( + meta.partition_reallocations.size() + < effective_batch_size) { + to_move.left_to_move--; + meta.partition_reallocations.push_back( + std::move(reallocation)); + } else { + auto r_idx = random_generators::get_int( + 0, idx - 1); + if (r_idx < meta.partition_reallocations.size()) { + std::swap( + meta.partition_reallocations[r_idx], + reallocation); + } } - break; } } } + ++to_move_it; + } + if (clusterlog.is_enabled(ss::log_level::debug)) { + for (auto& r : meta.partition_reallocations) { + vlog( + clusterlog.debug, + "{} moving {} -> {}", + r.ntp, + *r.replicas_to_remove.begin(), + subtract_replica_sets(r.new_replica_set, r.current_replica_set)); + } } - // reset after moving over all the partitions, the only case if the idx - // iterator is not reset is the case when we reached max concurrent - // reallocations. - idx_it->second = 0; } std::vector members_backend::ntps_moving_from_node_older_than(