diff --git a/tests/rptest/tests/scaling_up_test.py b/tests/rptest/tests/scaling_up_test.py index e91ebf265f4c..db153500ea66 100644 --- a/tests/rptest/tests/scaling_up_test.py +++ b/tests/rptest/tests/scaling_up_test.py @@ -27,41 +27,47 @@ class ScalingUpTest(EndToEndTest): rebalance_timeout = 120 group_topic_partitions = 16 - def _replicas_per_node(self): + # returns [{common domain topics replicas},{consumer_offsets replicas}] + def _replicas_per_domain_per_node(self): kafkacat = KafkaCat(self.redpanda) - node_replicas = {} + node_replicas = [{}, {}] md = kafkacat.metadata() self.redpanda.logger.debug(f"metadata: {md}") for topic in md['topics']: + # this index doesn't agree to the numbering in cluster/types.h + # but that does not really matter until there will be more domains + i = 1 if topic['topic'] == '__consumer_offsets' else 0 for p in topic['partitions']: for r in p['replicas']: id = r['id'] - if id not in node_replicas: - node_replicas[id] = 0 - node_replicas[id] += 1 + if id not in node_replicas[i]: + node_replicas[i][id] = 0 + node_replicas[i][id] += 1 return node_replicas def wait_for_partitions_rebalanced(self, total_replicas, timeout_sec): - - expected_per_node = total_replicas / len(self.redpanda.started_nodes()) - expected_range = [0.8 * expected_per_node, 1.2 * expected_per_node] - def partitions_rebalanced(): - per_node = self._replicas_per_node() + per_domain = self._replicas_per_domain_per_node() self.redpanda.logger.info( - f"replicas per node: {per_node}, expected range: [{expected_range[0]},{expected_range[1]}]" - ) - if len(per_node) < len(self.redpanda.started_nodes()): - return False + f"replicas per node: common domain: {per_domain[0]}, " + f"__consumer_offsets: {per_domain[1]} ") - replicas = sum(per_node.values()) - if replicas != total_replicas: + # total # of nodes + if len(set().union(*(n.keys() for n in per_domain))) < len( + self.redpanda.started_nodes()): return False - if not all(expected_range[0] <= p[1] <= expected_range[1] - for p in per_node.items()): + # total # of replicas + if sum(p for n in per_domain + for p in n.values()) != total_replicas: return False + + # replicas balanced condition separately for each domain + for per_node in per_domain: + if max(per_node.values()) - min(per_node.values()) > 1: + return False + admin = Admin(self.redpanda) # make sure that all reconfigurations are finished @@ -184,7 +190,7 @@ def test_on_demand_rebalancing(self, partition_count): # verify that all new nodes are empty - per_node = self._replicas_per_node() + per_node = self._replicas_per_domain_per_node() assert len(per_node) == 3