Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the balanced partition criteria in scaling up DTs #7768

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions tests/rptest/tests/scaling_up_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,41 +27,47 @@ class ScalingUpTest(EndToEndTest):
rebalance_timeout = 120
group_topic_partitions = 16

def _replicas_per_node(self):
# returns [{common domain topics replicas},{consumer_offsets replicas}]
def _replicas_per_domain_per_node(self):
kafkacat = KafkaCat(self.redpanda)
node_replicas = {}
node_replicas = [{}, {}]
md = kafkacat.metadata()
self.redpanda.logger.debug(f"metadata: {md}")
for topic in md['topics']:
# this index doesn't agree to the numbering in cluster/types.h
# but that does not really matter until there will be more domains
i = 1 if topic['topic'] == '__consumer_offsets' else 0
for p in topic['partitions']:
for r in p['replicas']:
id = r['id']
if id not in node_replicas:
node_replicas[id] = 0
node_replicas[id] += 1
if id not in node_replicas[i]:
node_replicas[i][id] = 0
node_replicas[i][id] += 1

return node_replicas

def wait_for_partitions_rebalanced(self, total_replicas, timeout_sec):

expected_per_node = total_replicas / len(self.redpanda.started_nodes())
expected_range = [0.8 * expected_per_node, 1.2 * expected_per_node]

def partitions_rebalanced():
per_node = self._replicas_per_node()
per_domain = self._replicas_per_domain_per_node()
self.redpanda.logger.info(
f"replicas per node: {per_node}, expected range: [{expected_range[0]},{expected_range[1]}]"
)
if len(per_node) < len(self.redpanda.started_nodes()):
return False
f"replicas per node: common domain: {per_domain[0]}, "
f"__consumer_offsets: {per_domain[1]} ")

replicas = sum(per_node.values())
if replicas != total_replicas:
# total # of nodes
if len(set().union(*(n.keys() for n in per_domain))) < len(
self.redpanda.started_nodes()):
return False

if not all(expected_range[0] <= p[1] <= expected_range[1]
for p in per_node.items()):
# total # of replicas
if sum(p for n in per_domain
for p in n.values()) != total_replicas:
return False

# replicas balanced condition separately for each domain
for per_node in per_domain:
if max(per_node.values()) - min(per_node.values()) > 1:
return False

admin = Admin(self.redpanda)

# make sure that all reconfigurations are finished
Expand Down Expand Up @@ -184,7 +190,7 @@ def test_on_demand_rebalancing(self, partition_count):

# verify that all new nodes are empty

per_node = self._replicas_per_node()
per_node = self._replicas_per_domain_per_node()

assert len(per_node) == 3

Expand Down