Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not allow decommissioned node rejoining the cluster #8547

Merged
merged 6 commits into from
Feb 1, 2023
23 changes: 22 additions & 1 deletion src/v/cluster/members_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,13 @@ members_manager::dispatch_join_to_seed_server(
return f.then_wrapped([it, this, req](ss::future<ret_t> fut) {
try {
auto r = fut.get0();
if (r && r.value().success) {
if (r.has_error() || !r.value().success) {
vlog(
clusterlog.warn,
"Error joining cluster using {} seed server - {}",
it->addr,
r.has_error() ? r.error().message() : "not allowed to join");
} else {
return ss::make_ready_future<ret_t>(r);
}
} catch (...) {
Expand Down Expand Up @@ -783,7 +789,22 @@ members_manager::handle_join_request(join_node_request const req) {
co_return ret_t(
join_node_reply{false, model::unassigned_node_id});
}
// if node was removed from the cluster doesn't allow it to rejoin
// with the same UUID
if (_members_table.local()
.get_removed_node_metadata_ref(it->second)
.has_value()) {
vlog(
clusterlog.warn,
"Preventing decommissioned node {} with UUID {} from joining "
"the cluster",
it->second,
it->first);
co_return ret_t(
join_node_reply{false, model::unassigned_node_id});
}
}

// Proceed to adding the node ID to the controller Raft group.
// Presumably the node that made this join request started its Raft
// subsystem with the node ID and is waiting to join the group.
Expand Down
5 changes: 2 additions & 3 deletions src/v/raft/replicate_entries_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,9 @@ ss::future<result<replicate_result>> replicate_entries_stm::apply(units_t u) {
}
if (rni != _ptr->self()) {
auto it = _ptr->_fstats.find(rni);
if (it == _ptr->_fstats.end()) {
return;
if (it != _ptr->_fstats.end()) {
it->second.last_sent_offset = _dirty_offset;
}
it->second.last_sent_offset = _dirty_offset;
}
++_requests_count;
(void)dispatch_one(rni); // background
Expand Down
5 changes: 3 additions & 2 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,8 @@ def start_node(self,
first_start=False,
expect_fail: bool = False,
auto_assign_node_id: bool = False,
omit_seeds_on_idx_one: bool = True):
omit_seeds_on_idx_one: bool = True,
skip_readiness_check: bool = False):
"""
Start a single instance of redpanda. This function will not return until
redpanda appears to have started successfully. If redpanda does not
Expand Down Expand Up @@ -1315,7 +1316,7 @@ def start_rp():
err_msg=
f"Redpanda processes did not terminate on {node.name} during startup as expected"
)
else:
elif not skip_readiness_check:
wait_until(
lambda: self.__is_status_ready(node),
timeout_sec=timeout,
Expand Down
49 changes: 49 additions & 0 deletions tests/rptest/tests/nodes_decommissioning_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,3 +623,52 @@ def has_partitions():
omit_seeds_on_idx_one=False)

wait_until(has_partitions, 180, 2)

@cluster(num_nodes=4, log_allow_list=RESTART_LOG_ALLOW_LIST)
@parametrize(new_bootstrap=True)
@parametrize(new_bootstrap=False)
def test_node_is_not_allowed_to_join_after_restart(self, new_bootstrap):
self.start_redpanda(num_nodes=4, new_bootstrap=new_bootstrap)
self._create_topics()

admin = Admin(self.redpanda)

to_decommission = self.redpanda.nodes[-1]
to_decommission_id = self.redpanda.node_id(to_decommission)
self.logger.info(f"decommissioning node: {to_decommission_id}")
admin.decommission_broker(to_decommission_id)
self._wait_for_node_removed(to_decommission_id)

# restart decommissioned node without cleaning up the data directory,
# the node should not be allowed to join the cluster
# back as it was decommissioned
self.redpanda.stop_node(to_decommission)
self.redpanda.start_node(to_decommission,
auto_assign_node_id=new_bootstrap,
omit_seeds_on_idx_one=not new_bootstrap,
skip_readiness_check=True)

# wait until decommissioned node attempted to join the cluster back
def tried_to_join():
# allow fail as `grep` return 1 when no entries matches
# the ssh access shouldn't be a problem as only few lines are transferred
logs = to_decommission.account.ssh_output(
f'tail -n 200 {RedpandaService.STDOUT_STDERR_CAPTURE} | grep members_manager | grep WARN',
allow_fail=True).decode()

# check if there are at least 3 failed join attempts
return sum([
1 for l in logs.splitlines() if "Error joining cluster" in l
]) >= 3

wait_until(tried_to_join, 20, 1)

assert len(admin.get_brokers(node=self.redpanda.nodes[0])) == 3
self.redpanda.stop_node(to_decommission)
# clean node and restart it, it should join the cluster
self.redpanda.clean_node(to_decommission, preserve_logs=True)
self.redpanda.start_node(to_decommission,
omit_seeds_on_idx_one=not new_bootstrap,
auto_assign_node_id=new_bootstrap)

assert len(admin.get_brokers(node=self.redpanda.nodes[0])) == 4
9 changes: 9 additions & 0 deletions tests/rptest/utils/node_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ def wait_for_removed(self, node_id: int):
self.logger.info(
f"executor - waiting for node {node_id} to be removed")

# wait for node to be removed of decommissioning to stop making progress
waiter = NodeDecommissionWaiter(self.redpanda,
node_id=node_id,
logger=self.logger,
progress_timeout=60)

waiter.wait_for_removal()

# just confirm if node removal was propagated to the the majority of nodes
def is_node_removed(node_to_query, node_id):
try:
brokers = self.get_statuses(node_to_query)
Expand Down