Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use different AMQP address format for v1 and v2 #11604

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_trace.erl",
"src/rabbit_tracking_store.erl",
"src/rabbit_upgrade_preparation.erl",
"src/rabbit_uri.erl",
"src/rabbit_variable_queue.erl",
"src/rabbit_version.erl",
"src/rabbit_vhost.erl",
Expand Down Expand Up @@ -481,6 +482,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_trace.erl",
"src/rabbit_tracking_store.erl",
"src/rabbit_upgrade_preparation.erl",
"src/rabbit_uri.erl",
"src/rabbit_variable_queue.erl",
"src/rabbit_version.erl",
"src/rabbit_vhost.erl",
Expand Down Expand Up @@ -762,6 +764,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_tracking.erl",
"src/rabbit_tracking_store.erl",
"src/rabbit_upgrade_preparation.erl",
"src/rabbit_uri.erl",
"src/rabbit_variable_queue.erl",
"src/rabbit_version.erl",
"src/rabbit_vhost.erl",
Expand Down
18 changes: 9 additions & 9 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ handle_http_req(<<"GET">>,
_User,
_ConnPid,
PermCaches) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
QName = queue_resource(Vhost, QNameBin),
case rabbit_amqqueue:with(
QName,
Expand Down Expand Up @@ -110,7 +110,7 @@ handle_http_req(HttpMethod = <<"PUT">>,
exclusive := Exclusive,
arguments := QArgs0
} = decode_queue(ReqPayload),
QNameBin = uri_string:unquote(QNameBinQuoted),
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
Owner = case Exclusive of
true -> ConnPid;
false -> none
Expand Down Expand Up @@ -185,7 +185,7 @@ handle_http_req(<<"PUT">>,
User = #user{username = Username},
_ConnPid,
{PermCache0, TopicPermCache}) ->
XNameBin = uri_string:unquote(XNameBinQuoted),
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
#{type := XTypeBin,
durable := Durable,
auto_delete := AutoDelete,
Expand Down Expand Up @@ -226,7 +226,7 @@ handle_http_req(<<"DELETE">>,
User,
ConnPid,
{PermCache0, TopicPermCache}) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
QName = queue_resource(Vhost, QNameBin),
PermCache = check_resource_access(QName, read, User, PermCache0),
try rabbit_amqqueue:with_exclusive_access_or_die(
Expand Down Expand Up @@ -254,7 +254,7 @@ handle_http_req(<<"DELETE">>,
User = #user{username = Username},
ConnPid,
{PermCache0, TopicPermCache}) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
QName = queue_resource(Vhost, QNameBin),
ok = prohibit_cr_lf(QNameBin),
PermCache = check_resource_access(QName, configure, User, PermCache0),
Expand All @@ -274,7 +274,7 @@ handle_http_req(<<"DELETE">>,
User = #user{username = Username},
_ConnPid,
{PermCache0, TopicPermCache}) ->
XNameBin = uri_string:unquote(XNameBinQuoted),
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
XName = exchange_resource(Vhost, XNameBin),
ok = prohibit_cr_lf(XNameBin),
ok = prohibit_default_exchange(XName),
Expand Down Expand Up @@ -594,9 +594,9 @@ decode_binding_path_segment(Segment) ->
end,
case re:run(Segment, MP, [{capture, all_but_first, binary}]) of
{match, [SrcQ, <<DstKindChar>>, DstQ, KeyQ, ArgsHash]} ->
Src = uri_string:unquote(SrcQ),
Dst = uri_string:unquote(DstQ),
Key = uri_string:unquote(KeyQ),
Src = rabbit_uri:urldecode(SrcQ),
Dst = rabbit_uri:urldecode(DstQ),
Key = rabbit_uri:urldecode(KeyQ),
DstKind = destination_char_to_kind(DstKindChar),
{Src, DstKind, Dst, Key, ArgsHash};
nomatch ->
Expand Down
227 changes: 124 additions & 103 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
-rabbit_deprecated_feature(
{amqp_address_v1,
#{deprecation_phase => permitted_by_default,
doc_url => "https://www.rabbitmq.com/docs/next/amqp#address",
messages =>
#{when_permitted =>
"RabbitMQ AMQP address version 1 is deprecated. "
"Clients should use RabbitMQ AMQP address version 2."}}
"Clients should use RabbitMQ AMQP address version 2.",
when_denied =>
"RabbitMQ AMQP address version 1 is unsupported. "
"Clients must use RabbitMQ AMQP address version 2."
}}
}).

-define(PROTOCOL, amqp10).
Expand Down Expand Up @@ -2422,12 +2427,24 @@ ensure_source(#'v1_0.source'{address = Address,
durable = Durable},
Vhost, User, PermCache, TopicPermCache) ->
case Address of
{utf8, <<"/q/", QNameBinQuoted/binary>>} ->
%% The only possible v2 source address format is:
%% /q/:queue
try rabbit_uri:urldecode(QNameBinQuoted) of
QNameBin ->
QName = queue_resource(Vhost, QNameBin),
ok = exit_if_absent(QName),
{ok, QName, PermCache, TopicPermCache}
catch error:_ ->
{error, {bad_address, Address}}
end;
{utf8, SourceAddr} ->
case address_v1_permitted() of
true -> ensure_source_v1(
SourceAddr, Vhost, User, Durable, PermCache, TopicPermCache);
false -> ensure_source_v2(
SourceAddr, Vhost, PermCache, TopicPermCache)
true ->
ensure_source_v1(SourceAddr, Vhost, User, Durable,
PermCache, TopicPermCache);
false ->
{error, {amqp_address_v1_not_permitted, Address}}
end;
_ ->
{error, {bad_address, Address}}
Expand Down Expand Up @@ -2467,19 +2484,10 @@ ensure_source_v1(Address,
Err
end
end;
{error, _} ->
ensure_source_v2(Address, Vhost, PermCache0, TopicPermCache0)
{error, _} = Err ->
Err
end.

%% The only possible v2 source address format is:
%% /queue/:queue
ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) ->
QName = queue_resource(Vhost, QNameBin),
ok = exit_if_absent(QName),
{ok, QName, PermCache, TopicPermCache};
ensure_source_v2(Address, _, _, _) ->
{error, {bad_address, Address}}.

-spec ensure_target(#'v1_0.target'{},
rabbit_types:vhost(),
rabbit_types:user(),
Expand All @@ -2495,29 +2503,28 @@ ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) ->
ensure_target(#'v1_0.target'{address = Address,
durable = Durable},
Vhost, User, PermCache) ->
case address_v1_permitted() of
true ->
try_target_v1(Address, Vhost, User, Durable, PermCache);
false ->
try_target_v2(Address, Vhost, User, PermCache)
end.

try_target_v1(Address, Vhost, User, Durable, PermCache0) ->
case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of
{ok, XNameBin, RKey, QNameBin, PermCache} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{error, _} ->
try_target_v2(Address, Vhost, User, PermCache0)
end.

try_target_v2(Address, Vhost, User, PermCache) ->
case ensure_target_v2(Address, Vhost) of
{ok, to, RKey, QNameBin} ->
{ok, to, RKey, QNameBin, PermCache};
{ok, XNameBin, RKey, QNameBin} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{error, _} = Err ->
Err
case target_address_version(Address) of
2 ->
case ensure_target_v2(Address, Vhost) of
{ok, to, RKey, QNameBin} ->
{ok, to, RKey, QNameBin, PermCache};
{ok, XNameBin, RKey, QNameBin} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{error, _} = Err ->
Err
end;
1 ->
case address_v1_permitted() of
true ->
case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of
{ok, XNameBin, RKey, QNameBin, PermCache1} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1);
{error, _} = Err ->
Err
end;
false ->
{error, {amqp_address_v1_not_permitted, Address}}
end
end.

check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
Expand All @@ -2539,29 +2546,24 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
exit_not_found(XName)
end.

ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
case rabbit_routing_parser:parse_endpoint(Address, true) of
{ok, Dest} ->
{QNameBin, PermCache} = ensure_terminus(
target, Dest, Vhost, User, Durable, PermCache0),
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
XNameBin = unicode:characters_to_binary(XNameList1),
RoutingKey = case RK of
undefined -> subject;
[] -> subject;
_ -> unicode:characters_to_binary(RK)
end,
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
{error, _} = Err ->
Err
end;
ensure_target_v1(Address, _, _, _, _) ->
{error, {bad_address, Address}}.
address_v1_permitted() ->
rabbit_deprecated_features:is_permitted(amqp_address_v1).

target_address_version({utf8, <<"/e/", _/binary>>}) ->
2;
target_address_version({utf8, <<"/q/", _/binary>>}) ->
2;
target_address_version(undefined) ->
%% anonymous terminus
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
2;
target_address_version(_Address) ->
1.

%% The possible v2 target address formats are:
%% /exchange/:exchange/key/:routing-key
%% /exchange/:exchange
%% /queue/:queue
%% /e/:exchange/:routing-key
%% /e/:exchange
%% /q/:queue
%% <null>
ensure_target_v2({utf8, String}, Vhost) ->
case parse_target_v2_string(String) of
Expand All @@ -2576,42 +2578,64 @@ ensure_target_v2({utf8, String}, Vhost) ->
ensure_target_v2(undefined, _) ->
%% anonymous terminus
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
{ok, to, to, undefined};
ensure_target_v2(Address, _) ->
{error, {bad_address, Address}}.
{ok, to, to, undefined}.

parse_target_v2_string(String) ->
try parse_target_v2_string0(String)
catch error:_ ->
{error, bad_address}
end.

parse_target_v2_string(<<"/exchange/", Rest/binary>>) ->
case split_exchange_target(Rest) of
{?DEFAULT_EXCHANGE_NAME, _} ->
parse_target_v2_string0(<<"/e/", Rest/binary>>) ->
Key = cp_slash,
Pattern = try persistent_term:get(Key)
catch error:badarg ->
Cp = binary:compile_pattern(<<"/">>),
ok = persistent_term:put(Key, Cp),
Cp
end,
case binary:split(Rest, Pattern, [global]) of
[?DEFAULT_EXCHANGE_NAME | _] ->
{error, bad_address};
{<<"amq.default">>, _} ->
[<<"amq.default">> | _] ->
{error, bad_address};
{XNameBin, RKey} ->
{ok, XNameBin, RKey, undefined}
[XNameBinQuoted] ->
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
{ok, XNameBin, <<>>, undefined};
[XNameBinQuoted, RKeyQuoted] ->
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
RKey = rabbit_uri:urldecode(RKeyQuoted),
{ok, XNameBin, RKey, undefined};
_ ->
{error, bad_address}
end;
parse_target_v2_string(<<"/queue/">>) ->
parse_target_v2_string0(<<"/q/">>) ->
%% empty queue name is invalid
{error, bad_address};
parse_target_v2_string(<<"/queue/", QNameBin/binary>>) ->
parse_target_v2_string0(<<"/q/", QNameBinQuoted/binary>>) ->
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
{ok, ?DEFAULT_EXCHANGE_NAME, QNameBin, QNameBin};
parse_target_v2_string(_) ->
parse_target_v2_string0(_) ->
{error, bad_address}.

%% Empty exchange name (default exchange) is valid.
split_exchange_target(Target) ->
Key = cp_amqp_target_address,
Pattern = try persistent_term:get(Key)
catch error:badarg ->
Cp = binary:compile_pattern(<<"/key/">>),
ok = persistent_term:put(Key, Cp),
Cp
end,
case binary:split(Target, Pattern) of
[XNameBin] ->
{XNameBin, <<>>};
[XNameBin, RoutingKey] ->
{XNameBin, RoutingKey}
end.
ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
case rabbit_routing_parser:parse_endpoint(Address, true) of
{ok, Dest} ->
{QNameBin, PermCache} = ensure_terminus(
target, Dest, Vhost, User, Durable, PermCache0),
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
XNameBin = unicode:characters_to_binary(XNameList1),
RoutingKey = case RK of
undefined -> subject;
[] -> subject;
_ -> unicode:characters_to_binary(RK)
end,
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
{error, _} = Err ->
Err
end;
ensure_target_v1(Address, _, _, _, _) ->
{error, {bad_address, Address}}.

handle_outgoing_mgmt_link_flow_control(
#management_link{delivery_count = DeliveryCountSnd} = Link0,
Expand Down Expand Up @@ -3355,14 +3379,24 @@ error_not_found(Resource) ->
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, Description}}.

address_v1_permitted() ->
rabbit_deprecated_features:is_permitted(amqp_address_v1).

-spec cap_credit(rabbit_queue_type:credit()) ->
0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX.
cap_credit(DesiredCredit) ->
min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
mc:convert(mc_amqpl, Mc, McEnv)
end.

format_status(
#{state := #state{cfg = Cfg,
outgoing_pending = OutgoingPending,
Expand Down Expand Up @@ -3407,16 +3441,3 @@ format_status(
permission_cache => PermissionCache,
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
mc:convert(mc_amqpl, Mc, McEnv)
end.
Loading
Loading