Skip to content

Commit

Permalink
Merge pull request #9909 from rabbitmq/rabbitmq-server-9894
Browse files Browse the repository at this point in the history
Dynamic Shovels: support old (pre-3.13.0, 3.12.8) and new supervisor child ID formats
  • Loading branch information
michaelklishin committed Nov 14, 2023
2 parents 61dac76 + 28741cd commit 2adec32
Showing 1 changed file with 32 additions and 5 deletions.
37 changes: 32 additions & 5 deletions deps/rabbitmq_shovel/src/rabbit_shovel_dyn_worker_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ obfuscated_uris_parameters(Def) when is_list(Def) ->
rabbit_shovel_parameters:obfuscate_uris_in_definition(Def).

child_exists(Name) ->
lists:any(fun ({{_, N}, _, _, _}) -> N =:= Name end,
lists:any(fun ({{_, N}, _, _, _}) -> N =:= Name;
%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894.
({N, _, _, _}) -> N =:= Name
end,
mirrored_supervisor:which_children(?SUPERVISOR)).

stop_child({VHost, ShovelName} = Name) ->
Expand All @@ -83,19 +86,43 @@ stop_child({VHost, ShovelName} = Name) ->
%% See rabbit_shovel_worker:terminate/2

cleanup_specs() ->
SpecsSet = sets:from_list([element(2, element(1, S)) || S <- mirrored_supervisor:which_children(?SUPERVISOR)]),
Children = mirrored_supervisor:which_children(?SUPERVISOR),

%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
OldStyleSpecsSet = sets:from_list([element(1, S) || S <- Children]),
NewStyleSpecsSet = sets:from_list([element(2, element(1, S)) || S <- Children]),
ParamsSet = sets:from_list([ {proplists:get_value(vhost, S), proplists:get_value(name, S)}
|| S <- rabbit_runtime_parameters:list_component(<<"shovel">>) ]),
F = fun(Name, ok) ->
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name)),
try
%% The supervisor operation is very unlikely to fail, it's the schema
%% data stores that can make a fuss about a non-existent or non-standard value passed in.
%% For example, an old style Shovel name is an invalid Khepri query path element. MK.
_ = mirrored_supervisor:delete_child(?SUPERVISOR, id(Name))
catch _:_:_Stacktrace ->
ok
end,
ok
end,
ok = sets:fold(F, ok, sets:subtract(SpecsSet, ParamsSet)).
%% Khepri won't handle values in OldStyleSpecsSet in its path well. At the same time,
%% those older elements simply cannot exist in Khepri because having Khepri enabled
%% means a cluster-wide move to 3.13+, so we can conditionally compute what specs we care about. MK.
AllSpecs =
case rabbit_khepri:is_enabled() of
true -> NewStyleSpecsSet;
false -> sets:union(NewStyleSpecsSet, OldStyleSpecsSet)
end,
%% Delete any supervisor children that do not have their respective runtime parameters in the database.
SetToCleanUp = sets:subtract(AllSpecs, ParamsSet),
ok = sets:fold(F, ok, SetToCleanUp).

%%----------------------------------------------------------------------------

init([]) ->
{ok, {{one_for_one, 3, 10}, []}}.

id({V, S} = Name) ->
{[V, S], Name}.
{[V, S], Name};
%% older format, pre 3.13.0 and 3.12.8. See rabbitmq/rabbitmq-server#9894
id(Other) ->
Other.

0 comments on commit 2adec32

Please sign in to comment.