Skip to content

Commit

Permalink
Merge pull request #11604 from rabbitmq/amqp-addr
Browse files Browse the repository at this point in the history
Use different AMQP address format for v1 and v2
  • Loading branch information
michaelklishin committed Jul 3, 2024
2 parents c9956b0 + 7b18bd7 commit 20aee3f
Show file tree
Hide file tree
Showing 12 changed files with 448 additions and 185 deletions.
3 changes: 3 additions & 0 deletions deps/rabbit/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_trace.erl",
"src/rabbit_tracking_store.erl",
"src/rabbit_upgrade_preparation.erl",
"src/rabbit_uri.erl",
"src/rabbit_variable_queue.erl",
"src/rabbit_version.erl",
"src/rabbit_vhost.erl",
Expand Down Expand Up @@ -481,6 +482,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_trace.erl",
"src/rabbit_tracking_store.erl",
"src/rabbit_upgrade_preparation.erl",
"src/rabbit_uri.erl",
"src/rabbit_variable_queue.erl",
"src/rabbit_version.erl",
"src/rabbit_vhost.erl",
Expand Down Expand Up @@ -762,6 +764,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_tracking.erl",
"src/rabbit_tracking_store.erl",
"src/rabbit_upgrade_preparation.erl",
"src/rabbit_uri.erl",
"src/rabbit_variable_queue.erl",
"src/rabbit_version.erl",
"src/rabbit_vhost.erl",
Expand Down
18 changes: 9 additions & 9 deletions deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ handle_http_req(<<"GET">>,
_User,
_ConnPid,
PermCaches) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
QName = queue_resource(Vhost, QNameBin),
case rabbit_amqqueue:with(
QName,
Expand Down Expand Up @@ -110,7 +110,7 @@ handle_http_req(HttpMethod = <<"PUT">>,
exclusive := Exclusive,
arguments := QArgs0
} = decode_queue(ReqPayload),
QNameBin = uri_string:unquote(QNameBinQuoted),
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
Owner = case Exclusive of
true -> ConnPid;
false -> none
Expand Down Expand Up @@ -185,7 +185,7 @@ handle_http_req(<<"PUT">>,
User = #user{username = Username},
_ConnPid,
{PermCache0, TopicPermCache}) ->
XNameBin = uri_string:unquote(XNameBinQuoted),
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
#{type := XTypeBin,
durable := Durable,
auto_delete := AutoDelete,
Expand Down Expand Up @@ -226,7 +226,7 @@ handle_http_req(<<"DELETE">>,
User,
ConnPid,
{PermCache0, TopicPermCache}) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
QName = queue_resource(Vhost, QNameBin),
PermCache = check_resource_access(QName, read, User, PermCache0),
try rabbit_amqqueue:with_exclusive_access_or_die(
Expand Down Expand Up @@ -254,7 +254,7 @@ handle_http_req(<<"DELETE">>,
User = #user{username = Username},
ConnPid,
{PermCache0, TopicPermCache}) ->
QNameBin = uri_string:unquote(QNameBinQuoted),
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
QName = queue_resource(Vhost, QNameBin),
ok = prohibit_cr_lf(QNameBin),
PermCache = check_resource_access(QName, configure, User, PermCache0),
Expand All @@ -274,7 +274,7 @@ handle_http_req(<<"DELETE">>,
User = #user{username = Username},
_ConnPid,
{PermCache0, TopicPermCache}) ->
XNameBin = uri_string:unquote(XNameBinQuoted),
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
XName = exchange_resource(Vhost, XNameBin),
ok = prohibit_cr_lf(XNameBin),
ok = prohibit_default_exchange(XName),
Expand Down Expand Up @@ -594,9 +594,9 @@ decode_binding_path_segment(Segment) ->
end,
case re:run(Segment, MP, [{capture, all_but_first, binary}]) of
{match, [SrcQ, <<DstKindChar>>, DstQ, KeyQ, ArgsHash]} ->
Src = uri_string:unquote(SrcQ),
Dst = uri_string:unquote(DstQ),
Key = uri_string:unquote(KeyQ),
Src = rabbit_uri:urldecode(SrcQ),
Dst = rabbit_uri:urldecode(DstQ),
Key = rabbit_uri:urldecode(KeyQ),
DstKind = destination_char_to_kind(DstKindChar),
{Src, DstKind, Dst, Key, ArgsHash};
nomatch ->
Expand Down
227 changes: 124 additions & 103 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
-rabbit_deprecated_feature(
{amqp_address_v1,
#{deprecation_phase => permitted_by_default,
doc_url => "https://www.rabbitmq.com/docs/next/amqp#address",
messages =>
#{when_permitted =>
"RabbitMQ AMQP address version 1 is deprecated. "
"Clients should use RabbitMQ AMQP address version 2."}}
"Clients should use RabbitMQ AMQP address version 2.",
when_denied =>
"RabbitMQ AMQP address version 1 is unsupported. "
"Clients must use RabbitMQ AMQP address version 2."
}}
}).

-define(PROTOCOL, amqp10).
Expand Down Expand Up @@ -2422,12 +2427,24 @@ ensure_source(#'v1_0.source'{address = Address,
durable = Durable},
Vhost, User, PermCache, TopicPermCache) ->
case Address of
{utf8, <<"/q/", QNameBinQuoted/binary>>} ->
%% The only possible v2 source address format is:
%% /q/:queue
try rabbit_uri:urldecode(QNameBinQuoted) of
QNameBin ->
QName = queue_resource(Vhost, QNameBin),
ok = exit_if_absent(QName),
{ok, QName, PermCache, TopicPermCache}
catch error:_ ->
{error, {bad_address, Address}}
end;
{utf8, SourceAddr} ->
case address_v1_permitted() of
true -> ensure_source_v1(
SourceAddr, Vhost, User, Durable, PermCache, TopicPermCache);
false -> ensure_source_v2(
SourceAddr, Vhost, PermCache, TopicPermCache)
true ->
ensure_source_v1(SourceAddr, Vhost, User, Durable,
PermCache, TopicPermCache);
false ->
{error, {amqp_address_v1_not_permitted, Address}}
end;
_ ->
{error, {bad_address, Address}}
Expand Down Expand Up @@ -2467,19 +2484,10 @@ ensure_source_v1(Address,
Err
end
end;
{error, _} ->
ensure_source_v2(Address, Vhost, PermCache0, TopicPermCache0)
{error, _} = Err ->
Err
end.

%% The only possible v2 source address format is:
%% /queue/:queue
ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) ->
QName = queue_resource(Vhost, QNameBin),
ok = exit_if_absent(QName),
{ok, QName, PermCache, TopicPermCache};
ensure_source_v2(Address, _, _, _) ->
{error, {bad_address, Address}}.

-spec ensure_target(#'v1_0.target'{},
rabbit_types:vhost(),
rabbit_types:user(),
Expand All @@ -2495,29 +2503,28 @@ ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) ->
ensure_target(#'v1_0.target'{address = Address,
durable = Durable},
Vhost, User, PermCache) ->
case address_v1_permitted() of
true ->
try_target_v1(Address, Vhost, User, Durable, PermCache);
false ->
try_target_v2(Address, Vhost, User, PermCache)
end.

try_target_v1(Address, Vhost, User, Durable, PermCache0) ->
case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of
{ok, XNameBin, RKey, QNameBin, PermCache} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{error, _} ->
try_target_v2(Address, Vhost, User, PermCache0)
end.

try_target_v2(Address, Vhost, User, PermCache) ->
case ensure_target_v2(Address, Vhost) of
{ok, to, RKey, QNameBin} ->
{ok, to, RKey, QNameBin, PermCache};
{ok, XNameBin, RKey, QNameBin} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{error, _} = Err ->
Err
case target_address_version(Address) of
2 ->
case ensure_target_v2(Address, Vhost) of
{ok, to, RKey, QNameBin} ->
{ok, to, RKey, QNameBin, PermCache};
{ok, XNameBin, RKey, QNameBin} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
{error, _} = Err ->
Err
end;
1 ->
case address_v1_permitted() of
true ->
case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of
{ok, XNameBin, RKey, QNameBin, PermCache1} ->
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1);
{error, _} = Err ->
Err
end;
false ->
{error, {amqp_address_v1_not_permitted, Address}}
end
end.

check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
Expand All @@ -2539,29 +2546,24 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
exit_not_found(XName)
end.

ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
case rabbit_routing_parser:parse_endpoint(Address, true) of
{ok, Dest} ->
{QNameBin, PermCache} = ensure_terminus(
target, Dest, Vhost, User, Durable, PermCache0),
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
XNameBin = unicode:characters_to_binary(XNameList1),
RoutingKey = case RK of
undefined -> subject;
[] -> subject;
_ -> unicode:characters_to_binary(RK)
end,
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
{error, _} = Err ->
Err
end;
ensure_target_v1(Address, _, _, _, _) ->
{error, {bad_address, Address}}.
address_v1_permitted() ->
rabbit_deprecated_features:is_permitted(amqp_address_v1).

target_address_version({utf8, <<"/e/", _/binary>>}) ->
2;
target_address_version({utf8, <<"/q/", _/binary>>}) ->
2;
target_address_version(undefined) ->
%% anonymous terminus
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
2;
target_address_version(_Address) ->
1.

%% The possible v2 target address formats are:
%% /exchange/:exchange/key/:routing-key
%% /exchange/:exchange
%% /queue/:queue
%% /e/:exchange/:routing-key
%% /e/:exchange
%% /q/:queue
%% <null>
ensure_target_v2({utf8, String}, Vhost) ->
case parse_target_v2_string(String) of
Expand All @@ -2576,42 +2578,64 @@ ensure_target_v2({utf8, String}, Vhost) ->
ensure_target_v2(undefined, _) ->
%% anonymous terminus
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
{ok, to, to, undefined};
ensure_target_v2(Address, _) ->
{error, {bad_address, Address}}.
{ok, to, to, undefined}.

parse_target_v2_string(String) ->
try parse_target_v2_string0(String)
catch error:_ ->
{error, bad_address}
end.

parse_target_v2_string(<<"/exchange/", Rest/binary>>) ->
case split_exchange_target(Rest) of
{?DEFAULT_EXCHANGE_NAME, _} ->
parse_target_v2_string0(<<"/e/", Rest/binary>>) ->
Key = cp_slash,
Pattern = try persistent_term:get(Key)
catch error:badarg ->
Cp = binary:compile_pattern(<<"/">>),
ok = persistent_term:put(Key, Cp),
Cp
end,
case binary:split(Rest, Pattern, [global]) of
[?DEFAULT_EXCHANGE_NAME | _] ->
{error, bad_address};
{<<"amq.default">>, _} ->
[<<"amq.default">> | _] ->
{error, bad_address};
{XNameBin, RKey} ->
{ok, XNameBin, RKey, undefined}
[XNameBinQuoted] ->
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
{ok, XNameBin, <<>>, undefined};
[XNameBinQuoted, RKeyQuoted] ->
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
RKey = rabbit_uri:urldecode(RKeyQuoted),
{ok, XNameBin, RKey, undefined};
_ ->
{error, bad_address}
end;
parse_target_v2_string(<<"/queue/">>) ->
parse_target_v2_string0(<<"/q/">>) ->
%% empty queue name is invalid
{error, bad_address};
parse_target_v2_string(<<"/queue/", QNameBin/binary>>) ->
parse_target_v2_string0(<<"/q/", QNameBinQuoted/binary>>) ->
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
{ok, ?DEFAULT_EXCHANGE_NAME, QNameBin, QNameBin};
parse_target_v2_string(_) ->
parse_target_v2_string0(_) ->
{error, bad_address}.

%% Empty exchange name (default exchange) is valid.
split_exchange_target(Target) ->
Key = cp_amqp_target_address,
Pattern = try persistent_term:get(Key)
catch error:badarg ->
Cp = binary:compile_pattern(<<"/key/">>),
ok = persistent_term:put(Key, Cp),
Cp
end,
case binary:split(Target, Pattern) of
[XNameBin] ->
{XNameBin, <<>>};
[XNameBin, RoutingKey] ->
{XNameBin, RoutingKey}
end.
ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
case rabbit_routing_parser:parse_endpoint(Address, true) of
{ok, Dest} ->
{QNameBin, PermCache} = ensure_terminus(
target, Dest, Vhost, User, Durable, PermCache0),
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
XNameBin = unicode:characters_to_binary(XNameList1),
RoutingKey = case RK of
undefined -> subject;
[] -> subject;
_ -> unicode:characters_to_binary(RK)
end,
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
{error, _} = Err ->
Err
end;
ensure_target_v1(Address, _, _, _, _) ->
{error, {bad_address, Address}}.

handle_outgoing_mgmt_link_flow_control(
#management_link{delivery_count = DeliveryCountSnd} = Link0,
Expand Down Expand Up @@ -3355,14 +3379,24 @@ error_not_found(Resource) ->
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
description = {utf8, Description}}.

address_v1_permitted() ->
rabbit_deprecated_features:is_permitted(amqp_address_v1).

-spec cap_credit(rabbit_queue_type:credit()) ->
0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX.
cap_credit(DesiredCredit) ->
min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
mc:convert(mc_amqpl, Mc, McEnv)
end.

format_status(
#{state := #state{cfg = Cfg,
outgoing_pending = OutgoingPending,
Expand Down Expand Up @@ -3407,16 +3441,3 @@ format_status(
permission_cache => PermissionCache,
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

ensure_mc_cluster_compat(Mc) ->
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
case IsEnabled of
true ->
Mc;
false ->
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
%% other nodes in the cluster may not understand the new internal
%% amqp mc format - in this case we convert to AMQP legacy format
%% for compatibility
mc:convert(mc_amqpl, Mc, McEnv)
end.
Loading

0 comments on commit 20aee3f

Please sign in to comment.