Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce flakiness of certain Common Test suites #10364

Merged
merged 14 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -542,16 +542,18 @@ handle_tick(QName,
ok = repair_leader_record(QName, Self),
ExpectedNodes = rabbit_nodes:list_members(),
case Nodes -- ExpectedNodes of
[] ->
ok;
Stale ->
Stale when length(ExpectedNodes) > 0 ->
%% rabbit_nodes:list_members/0 returns [] when there
%% is an error so we need to handle that case
rabbit_log:debug("~ts: stale nodes detected. Purging ~w",
[rabbit_misc:rs(QName), Stale]),
%% pipeline purge command
{ok, Q} = rabbit_amqqueue:lookup(QName),
ok = ra:pipeline_command(amqqueue:get_pid(Q),
rabbit_fifo:make_purge_nodes(Stale)),

ok;
_ ->
ok
end
catch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ reconciliate_quorum_queue_membership(State) ->
Running = rabbit_nodes:list_running(),
reconciliate_quorum_members(ExpectedNodes, Running, LocalLeaders, State, noop).

reconciliate_quorum_members([], _Running, _, _State, Result) ->
%% if there are no expected nodes rabbit_nodes:list_running/0 encountered
%% an error during query and returned the empty list which is case we need
%% to handle
Result;
reconciliate_quorum_members(_ExpectedNodes, _Running, [], _State, Result) ->
Result;
reconciliate_quorum_members(ExpectedNodes, Running, [Q | LocalLeaders],
Expand Down
50 changes: 29 additions & 21 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -683,27 +683,35 @@ tick(_Ts, _State) ->

maybe_resize_coordinator_cluster() ->
spawn(fun() ->
case ra:members({?MODULE, node()}) of
{_, Members, _} ->
MemberNodes = [Node || {_, Node} <- Members],
Running = rabbit_nodes:list_running(),
All = rabbit_nodes:list_members(),
case Running -- MemberNodes of
[] ->
ok;
New ->
rabbit_log:info("~ts: New rabbit node(s) detected, "
"adding : ~w",
[?MODULE, New]),
add_members(Members, New)
end,
case MemberNodes -- All of
[] ->
ok;
Old ->
rabbit_log:info("~ts: Rabbit node(s) removed from the cluster, "
"deleting: ~w", [?MODULE, Old]),
remove_members(Members, Old)
RabbitIsRunning = rabbit:is_running(),
case rabbit_nodes:list_members() of
All when RabbitIsRunning andalso length(All) > 0 ->
%% the members need to be the non-empty list _and_
%% rabbit needs to be running for it to be safe
%% to do any of the below
case ra:members({?MODULE, node()}) of
{_, Members, _} ->
MemberNodes = [Node || {_, Node} <- Members],
Running = rabbit_nodes:list_running(),
case Running -- MemberNodes of
[] ->
ok;
New ->
rabbit_log:info("~ts: New rabbit node(s) detected, "
"adding : ~w",
[?MODULE, New]),
add_members(Members, New)
end,
case MemberNodes -- All of
[] ->
ok;
Old ->
rabbit_log:info("~ts: Rabbit node(s) removed from the cluster, "
"deleting: ~w", [?MODULE, Old]),
remove_members(Members, Old)
end;
_ ->
ok
end;
_ ->
ok
Expand Down
8 changes: 5 additions & 3 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -564,11 +564,13 @@ handle_event(QName, {osiris_offset, _From, _Offs},
{ok, State#stream_client{readers = Readers}, Deliveries};
handle_event(_QName, {stream_leader_change, Pid}, State) ->
{ok, update_leader_pid(Pid, State), []};
handle_event(_QName, {stream_local_member_change, Pid}, #stream_client{local_pid = P} = State)
handle_event(_QName, {stream_local_member_change, Pid},
#stream_client{local_pid = P} = State)
when P == Pid ->
{ok, State, []};
handle_event(_QName, {stream_local_member_change, Pid}, State = #stream_client{name = QName,
readers = Readers0}) ->
handle_event(_QName, {stream_local_member_change, Pid},
#stream_client{name = QName,
readers = Readers0} = State) ->
rabbit_log:debug("Local member change event for ~tp", [QName]),
Readers1 = maps:fold(fun(T, #stream{log = Log0, reader_options = Options} = S0, Acc) ->
Offset = osiris_log:next_offset(Log0),
Expand Down
36 changes: 33 additions & 3 deletions deps/rabbit/test/message_containers_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,30 @@ end_per_testcase(Testcase, Config) ->
enable_ff(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QueueType = ?config(queue_type, Config),
QName = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QName, 0, 0},
declare(Ch, QName, [{<<"x-queue-type">>, longstr,
?config(queue_type, Config)}])),
declare(Ch, QName,
[{<<"x-queue-type">>, longstr, QueueType}])),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),

timer:sleep(100),
case QueueType of
<<"stream">> ->
%% if it is a stream we need to wait until there is a local member
%% on the node we want to subscibe from before proceeding
rabbit_ct_helpers:await_condition(
fun() ->
rabbit_ct_broker_helpers:rpc(Config, 2, ?MODULE,
has_local_member,
[#resource{kind = queue,
virtual_host = <<"/">>,
name = QName}])
end, 60000),
ok;
_ ->
ok
end,

ConsumerTag1 = <<"ctag1">>,
Ch2 = rabbit_ct_client_helpers:open_channel(Config, 2),
Expand Down Expand Up @@ -243,3 +259,17 @@ get_global_counters(Config) ->
qos(Ch, Prefetch) ->
?assertMatch(#'basic.qos_ok'{},
amqp_channel:call(Ch, #'basic.qos'{prefetch_count = Prefetch})).

has_local_member(QName) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
#{name := StreamId} = amqqueue:get_type_state(Q),
case rabbit_stream_coordinator:local_pid(StreamId) of
{ok, Pid} ->
is_process_alive(Pid);
_ ->
false
end;
_Err ->
false
end.
22 changes: 13 additions & 9 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2990,13 +2990,15 @@ per_message_ttl(Config) ->

Msg1 = <<"msg1">>,

#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = QQ},
#amqp_msg{props = #'P_basic'{delivery_mode = 2,
expiration = <<"2000">>},
payload = Msg1}),

wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
amqp_channel:wait_for_confirms(Ch, 5),
%% we know the message got to the queue in 2s it should be gone
timer:sleep(2000),
wait_for_messages(Config, [[QQ, <<"0">>, <<"0">>, <<"0">>]]),
ok.
Expand Down Expand Up @@ -3392,7 +3394,7 @@ leader_locator_balanced_maintenance(Config) ->
|| Q <- Qs].

leader_locator_balanced_random_maintenance(Config) ->
[S1, S2, S3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[S1, S2, _S3] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, S1),
Q = ?config(queue_name, Config),

Expand All @@ -3414,15 +3416,15 @@ leader_locator_balanced_random_maintenance(Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
Leader
end || _ <- lists:seq(1, 10)],
?assert(lists:member(S1, Leaders)),
?assertNot(lists:member(S2, Leaders)),
?assert(lists:member(S3, Leaders)),

ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env,
[rabbit, queue_leader_locator]),
ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, unset_env,
[rabbit, queue_count_start_random_selection]),
true = rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2).
true = rabbit_ct_broker_helpers:unmark_as_being_drained(Config, S2),
%% assert after resetting maintenance mode else other tests may also fail
?assertNot(lists:member(S2, Leaders)),
ok.

leader_locator_policy(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Expand All @@ -3441,12 +3443,14 @@ leader_locator_policy(Config) ->
{ok, _, {_, Leader}} = ra:members({ra_name(Q), Server}),
Leader
end || Q <- Qs],
?assertEqual(3, sets:size(sets:from_list(Leaders))),

[?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q}))
|| Q <- Qs],
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"my-leader-locator">>).
ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"my-leader-locator">>),

?assertEqual(3, length(lists:usort(Leaders))),
ok.

select_nodes_with_least_replicas(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Expand Down
50 changes: 32 additions & 18 deletions deps/rabbit/test/rabbit_stream_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,9 @@ find_queue_info(Config, Keys) ->
find_queue_info(Config, 0, Keys).

find_queue_info(Config, Node, Keys) ->
Name = ?config(queue_name, Config),
find_queue_info(?config(queue_name, Config), Config, Node, Keys).

find_queue_info(Name, Config, Node, Keys) ->
QName = rabbit_misc:r(<<"/">>, queue, Name),
Infos = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, info_all,
[<<"/">>, [name] ++ Keys]),
Expand Down Expand Up @@ -891,12 +893,13 @@ recover(Config) ->
%% Such a slow test, let's select a single random permutation and trust that over enough
%% ci rounds any failure will eventually show up

flush(),
ct:pal("recover: running stop start for permutation ~w", [Servers]),
[rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers],
[rabbit_ct_broker_helpers:async_start_node(Config, S) || S <- lists:reverse(Servers)],
[ok = rabbit_ct_broker_helpers:wait_for_async_start_node(S) || S <- lists:reverse(Servers)],

ct:pal("recover: running stop waiting for messages ~w", [Servers]),
ct:pal("recover: post stop / start, waiting for messages ~w", [Servers]),
check_leader_and_replicas(Config, Servers0),
queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]], 60),

Expand Down Expand Up @@ -2087,27 +2090,30 @@ leader_locator_client_local(Config) ->
?assertMatch(#'queue.delete_ok'{},
delete(Config, Server1, Q)),

Q2 = <<Q/binary, "-2">>,
%% Try second node
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server2, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
?assertEqual({'queue.declare_ok', Q2, 0, 0},
declare(Config, Server2, Q2, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),

?assertMatch(Server2, proplists:get_value(leader,
find_queue_info(Config, [leader]))),
find_queue_info(Q2, Config, 0, [leader]))),

?assertMatch(#'queue.delete_ok'{}, delete(Config, Server2, Q2)),

?assertMatch(#'queue.delete_ok'{}, delete(Config, Server2, Q)),

Q3 = <<Q/binary, "-3">>,
%% Try third node
?assertEqual({'queue.declare_ok', Q, 0, 0},
declare(Config, Server3, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
?assertEqual({'queue.declare_ok', Q3, 0, 0},
declare(Config, Server3, Q3, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])),


?assertEqual(Server3, proplists:get_value(leader,
find_queue_info(Config, [leader]))),
find_queue_info(Q3, Config, 0, [leader]))),

?assertMatch(#'queue.delete_ok'{}, delete(Config, Server3, Q)),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
?assertMatch(#'queue.delete_ok'{}, delete(Config, Server3, Q3)),
ok.

leader_locator_balanced(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Expand Down Expand Up @@ -2145,9 +2151,12 @@ leader_locator_balanced_maintenance(Config) ->
declare(Config, Server1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
{<<"x-queue-leader-locator">>, longstr, <<"balanced">>}])),

Info = find_queue_info(Config, [leader]),
Leader = proplists:get_value(leader, Info),
?assert(lists:member(Leader, [Server1, Server2])),
rabbit_ct_helpers:await_condition(
fun() ->
Info = find_queue_info(Config, [leader]),
Leader = proplists:get_value(leader, Info),
lists:member(Leader, [Server1, Server2])
end, 60000),

true = rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server3),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
Expand Down Expand Up @@ -2590,10 +2599,15 @@ check_leader_and_replicas(Config, Members) ->
check_leader_and_replicas(Config, Members, Tag) ->
rabbit_ct_helpers:await_condition(
fun() ->
Info = find_queue_info(Config, [leader, Tag]),
ct:pal("~ts members ~w ~tp", [?FUNCTION_NAME, Members, Info]),
lists:member(proplists:get_value(leader, Info), Members)
andalso (lists:sort(Members) == lists:sort(proplists:get_value(Tag, Info)))
case find_queue_info(Config, [leader, Tag]) of
[] ->
false;
Info ->
ct:pal("~ts members ~w ~tp", [?FUNCTION_NAME, Members, Info]),
lists:member(proplists:get_value(leader, Info), Members)
andalso (lists:sort(Members) ==
lists:sort(proplists:get_value(Tag, Info)))
end
end, 60_000).

check_members(Config, ExpectedMembers) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ defmodule RabbitMQ.CLI.Ctl.Commands.WaitCommand do
timeout,
fn ->
case :file.read_file(pid_file) do
{:ok, <<>>} ->
{:error, :loop}

{:ok, bin} ->
case Integer.parse(bin) do
:error ->
Expand Down
4 changes: 3 additions & 1 deletion deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ amqp(Config) ->
{ok, Session1} = amqp10_client:begin_session(Connection1),
ReceiverLinkName = <<"test-receiver">>,
{ok, Receiver} = amqp10_client:attach_receiver_link(
Session1, ReceiverLinkName, <<"/topic/topic.1">>, unsettled),
Session1, ReceiverLinkName, <<"/topic/topic.1">>, unsettled,
configuration),

%% MQTT 5.0 to AMQP 1.0
C = connect(ClientId, Config),
Expand All @@ -183,6 +184,7 @@ amqp(Config) ->
'User-Property' => UserProperty},
RequestPayload, [{qos, 1}]),


%% As of 3.13, AMQP 1.0 is proxied via AMQP 0.9.1 and therefore the conversion from
%% mc_mqtt to mc_amqpl takes place. We therefore lose MQTT User Property and Response Topic
%% which gets converted to AMQP 0.9.1 headers. In the future, Native AMQP 1.0 will convert
Expand Down
Loading