diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index ff67f1a180c9..e995d68782c0 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -223,6 +223,7 @@ def all_beam_files(name = "all_beam_files"): "src/rabbit_trace.erl", "src/rabbit_tracking_store.erl", "src/rabbit_upgrade_preparation.erl", + "src/rabbit_uri.erl", "src/rabbit_variable_queue.erl", "src/rabbit_version.erl", "src/rabbit_vhost.erl", @@ -481,6 +482,7 @@ def all_test_beam_files(name = "all_test_beam_files"): "src/rabbit_trace.erl", "src/rabbit_tracking_store.erl", "src/rabbit_upgrade_preparation.erl", + "src/rabbit_uri.erl", "src/rabbit_variable_queue.erl", "src/rabbit_version.erl", "src/rabbit_vhost.erl", @@ -762,6 +764,7 @@ def all_srcs(name = "all_srcs"): "src/rabbit_tracking.erl", "src/rabbit_tracking_store.erl", "src/rabbit_upgrade_preparation.erl", + "src/rabbit_uri.erl", "src/rabbit_variable_queue.erl", "src/rabbit_version.erl", "src/rabbit_vhost.erl", diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index 8b3f739e1790..118cfe1cfccf 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -80,7 +80,7 @@ handle_http_req(<<"GET">>, _User, _ConnPid, PermCaches) -> - QNameBin = uri_string:unquote(QNameBinQuoted), + QNameBin = rabbit_uri:urldecode(QNameBinQuoted), QName = queue_resource(Vhost, QNameBin), case rabbit_amqqueue:with( QName, @@ -110,7 +110,7 @@ handle_http_req(HttpMethod = <<"PUT">>, exclusive := Exclusive, arguments := QArgs0 } = decode_queue(ReqPayload), - QNameBin = uri_string:unquote(QNameBinQuoted), + QNameBin = rabbit_uri:urldecode(QNameBinQuoted), Owner = case Exclusive of true -> ConnPid; false -> none @@ -185,7 +185,7 @@ handle_http_req(<<"PUT">>, User = #user{username = Username}, _ConnPid, {PermCache0, TopicPermCache}) -> - XNameBin = uri_string:unquote(XNameBinQuoted), + XNameBin = rabbit_uri:urldecode(XNameBinQuoted), #{type := XTypeBin, durable := Durable, auto_delete := AutoDelete, @@ -226,7 +226,7 @@ handle_http_req(<<"DELETE">>, User, ConnPid, {PermCache0, TopicPermCache}) -> - QNameBin = uri_string:unquote(QNameBinQuoted), + QNameBin = rabbit_uri:urldecode(QNameBinQuoted), QName = queue_resource(Vhost, QNameBin), PermCache = check_resource_access(QName, read, User, PermCache0), try rabbit_amqqueue:with_exclusive_access_or_die( @@ -254,7 +254,7 @@ handle_http_req(<<"DELETE">>, User = #user{username = Username}, ConnPid, {PermCache0, TopicPermCache}) -> - QNameBin = uri_string:unquote(QNameBinQuoted), + QNameBin = rabbit_uri:urldecode(QNameBinQuoted), QName = queue_resource(Vhost, QNameBin), ok = prohibit_cr_lf(QNameBin), PermCache = check_resource_access(QName, configure, User, PermCache0), @@ -274,7 +274,7 @@ handle_http_req(<<"DELETE">>, User = #user{username = Username}, _ConnPid, {PermCache0, TopicPermCache}) -> - XNameBin = uri_string:unquote(XNameBinQuoted), + XNameBin = rabbit_uri:urldecode(XNameBinQuoted), XName = exchange_resource(Vhost, XNameBin), ok = prohibit_cr_lf(XNameBin), ok = prohibit_default_exchange(XName), @@ -594,9 +594,9 @@ decode_binding_path_segment(Segment) -> end, case re:run(Segment, MP, [{capture, all_but_first, binary}]) of {match, [SrcQ, <>, DstQ, KeyQ, ArgsHash]} -> - Src = uri_string:unquote(SrcQ), - Dst = uri_string:unquote(DstQ), - Key = uri_string:unquote(KeyQ), + Src = rabbit_uri:urldecode(SrcQ), + Dst = rabbit_uri:urldecode(DstQ), + Key = rabbit_uri:urldecode(KeyQ), DstKind = destination_char_to_kind(DstKindChar), {Src, DstKind, Dst, Key, ArgsHash}; nomatch -> diff --git a/deps/rabbit/src/rabbit_amqp_session.erl b/deps/rabbit/src/rabbit_amqp_session.erl index 0509da510554..4cf355a1d3ba 100644 --- a/deps/rabbit/src/rabbit_amqp_session.erl +++ b/deps/rabbit/src/rabbit_amqp_session.erl @@ -19,10 +19,15 @@ -rabbit_deprecated_feature( {amqp_address_v1, #{deprecation_phase => permitted_by_default, + doc_url => "https://www.rabbitmq.com/docs/next/amqp#address", messages => #{when_permitted => "RabbitMQ AMQP address version 1 is deprecated. " - "Clients should use RabbitMQ AMQP address version 2."}} + "Clients should use RabbitMQ AMQP address version 2.", + when_denied => + "RabbitMQ AMQP address version 1 is unsupported. " + "Clients must use RabbitMQ AMQP address version 2." + }} }). -define(PROTOCOL, amqp10). @@ -2422,12 +2427,24 @@ ensure_source(#'v1_0.source'{address = Address, durable = Durable}, Vhost, User, PermCache, TopicPermCache) -> case Address of + {utf8, <<"/q/", QNameBinQuoted/binary>>} -> + %% The only possible v2 source address format is: + %% /q/:queue + try rabbit_uri:urldecode(QNameBinQuoted) of + QNameBin -> + QName = queue_resource(Vhost, QNameBin), + ok = exit_if_absent(QName), + {ok, QName, PermCache, TopicPermCache} + catch error:_ -> + {error, {bad_address, Address}} + end; {utf8, SourceAddr} -> case address_v1_permitted() of - true -> ensure_source_v1( - SourceAddr, Vhost, User, Durable, PermCache, TopicPermCache); - false -> ensure_source_v2( - SourceAddr, Vhost, PermCache, TopicPermCache) + true -> + ensure_source_v1(SourceAddr, Vhost, User, Durable, + PermCache, TopicPermCache); + false -> + {error, {amqp_address_v1_not_permitted, Address}} end; _ -> {error, {bad_address, Address}} @@ -2467,19 +2484,10 @@ ensure_source_v1(Address, Err end end; - {error, _} -> - ensure_source_v2(Address, Vhost, PermCache0, TopicPermCache0) + {error, _} = Err -> + Err end. -%% The only possible v2 source address format is: -%% /queue/:queue -ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) -> - QName = queue_resource(Vhost, QNameBin), - ok = exit_if_absent(QName), - {ok, QName, PermCache, TopicPermCache}; -ensure_source_v2(Address, _, _, _) -> - {error, {bad_address, Address}}. - -spec ensure_target(#'v1_0.target'{}, rabbit_types:vhost(), rabbit_types:user(), @@ -2495,29 +2503,28 @@ ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) -> ensure_target(#'v1_0.target'{address = Address, durable = Durable}, Vhost, User, PermCache) -> - case address_v1_permitted() of - true -> - try_target_v1(Address, Vhost, User, Durable, PermCache); - false -> - try_target_v2(Address, Vhost, User, PermCache) - end. - -try_target_v1(Address, Vhost, User, Durable, PermCache0) -> - case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of - {ok, XNameBin, RKey, QNameBin, PermCache} -> - check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); - {error, _} -> - try_target_v2(Address, Vhost, User, PermCache0) - end. - -try_target_v2(Address, Vhost, User, PermCache) -> - case ensure_target_v2(Address, Vhost) of - {ok, to, RKey, QNameBin} -> - {ok, to, RKey, QNameBin, PermCache}; - {ok, XNameBin, RKey, QNameBin} -> - check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); - {error, _} = Err -> - Err + case target_address_version(Address) of + 2 -> + case ensure_target_v2(Address, Vhost) of + {ok, to, RKey, QNameBin} -> + {ok, to, RKey, QNameBin, PermCache}; + {ok, XNameBin, RKey, QNameBin} -> + check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache); + {error, _} = Err -> + Err + end; + 1 -> + case address_v1_permitted() of + true -> + case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of + {ok, XNameBin, RKey, QNameBin, PermCache1} -> + check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1); + {error, _} = Err -> + Err + end; + false -> + {error, {amqp_address_v1_not_permitted, Address}} + end end. check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> @@ -2539,29 +2546,24 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) -> exit_not_found(XName) end. -ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) -> - case rabbit_routing_parser:parse_endpoint(Address, true) of - {ok, Dest} -> - {QNameBin, PermCache} = ensure_terminus( - target, Dest, Vhost, User, Durable, PermCache0), - {XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest), - XNameBin = unicode:characters_to_binary(XNameList1), - RoutingKey = case RK of - undefined -> subject; - [] -> subject; - _ -> unicode:characters_to_binary(RK) - end, - {ok, XNameBin, RoutingKey, QNameBin, PermCache}; - {error, _} = Err -> - Err - end; -ensure_target_v1(Address, _, _, _, _) -> - {error, {bad_address, Address}}. +address_v1_permitted() -> + rabbit_deprecated_features:is_permitted(amqp_address_v1). + +target_address_version({utf8, <<"/e/", _/binary>>}) -> + 2; +target_address_version({utf8, <<"/q/", _/binary>>}) -> + 2; +target_address_version(undefined) -> + %% anonymous terminus + %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay + 2; +target_address_version(_Address) -> + 1. %% The possible v2 target address formats are: -%% /exchange/:exchange/key/:routing-key -%% /exchange/:exchange -%% /queue/:queue +%% /e/:exchange/:routing-key +%% /e/:exchange +%% /q/:queue %% ensure_target_v2({utf8, String}, Vhost) -> case parse_target_v2_string(String) of @@ -2576,42 +2578,64 @@ ensure_target_v2({utf8, String}, Vhost) -> ensure_target_v2(undefined, _) -> %% anonymous terminus %% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay - {ok, to, to, undefined}; -ensure_target_v2(Address, _) -> - {error, {bad_address, Address}}. + {ok, to, to, undefined}. + +parse_target_v2_string(String) -> + try parse_target_v2_string0(String) + catch error:_ -> + {error, bad_address} + end. -parse_target_v2_string(<<"/exchange/", Rest/binary>>) -> - case split_exchange_target(Rest) of - {?DEFAULT_EXCHANGE_NAME, _} -> +parse_target_v2_string0(<<"/e/", Rest/binary>>) -> + Key = cp_slash, + Pattern = try persistent_term:get(Key) + catch error:badarg -> + Cp = binary:compile_pattern(<<"/">>), + ok = persistent_term:put(Key, Cp), + Cp + end, + case binary:split(Rest, Pattern, [global]) of + [?DEFAULT_EXCHANGE_NAME | _] -> {error, bad_address}; - {<<"amq.default">>, _} -> + [<<"amq.default">> | _] -> {error, bad_address}; - {XNameBin, RKey} -> - {ok, XNameBin, RKey, undefined} + [XNameBinQuoted] -> + XNameBin = rabbit_uri:urldecode(XNameBinQuoted), + {ok, XNameBin, <<>>, undefined}; + [XNameBinQuoted, RKeyQuoted] -> + XNameBin = rabbit_uri:urldecode(XNameBinQuoted), + RKey = rabbit_uri:urldecode(RKeyQuoted), + {ok, XNameBin, RKey, undefined}; + _ -> + {error, bad_address} end; -parse_target_v2_string(<<"/queue/">>) -> +parse_target_v2_string0(<<"/q/">>) -> %% empty queue name is invalid {error, bad_address}; -parse_target_v2_string(<<"/queue/", QNameBin/binary>>) -> +parse_target_v2_string0(<<"/q/", QNameBinQuoted/binary>>) -> + QNameBin = rabbit_uri:urldecode(QNameBinQuoted), {ok, ?DEFAULT_EXCHANGE_NAME, QNameBin, QNameBin}; -parse_target_v2_string(_) -> +parse_target_v2_string0(_) -> {error, bad_address}. -%% Empty exchange name (default exchange) is valid. -split_exchange_target(Target) -> - Key = cp_amqp_target_address, - Pattern = try persistent_term:get(Key) - catch error:badarg -> - Cp = binary:compile_pattern(<<"/key/">>), - ok = persistent_term:put(Key, Cp), - Cp - end, - case binary:split(Target, Pattern) of - [XNameBin] -> - {XNameBin, <<>>}; - [XNameBin, RoutingKey] -> - {XNameBin, RoutingKey} - end. +ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) -> + case rabbit_routing_parser:parse_endpoint(Address, true) of + {ok, Dest} -> + {QNameBin, PermCache} = ensure_terminus( + target, Dest, Vhost, User, Durable, PermCache0), + {XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest), + XNameBin = unicode:characters_to_binary(XNameList1), + RoutingKey = case RK of + undefined -> subject; + [] -> subject; + _ -> unicode:characters_to_binary(RK) + end, + {ok, XNameBin, RoutingKey, QNameBin, PermCache}; + {error, _} = Err -> + Err + end; +ensure_target_v1(Address, _, _, _, _) -> + {error, {bad_address, Address}}. handle_outgoing_mgmt_link_flow_control( #management_link{delivery_count = DeliveryCountSnd} = Link0, @@ -3355,14 +3379,24 @@ error_not_found(Resource) -> condition = ?V_1_0_AMQP_ERROR_NOT_FOUND, description = {utf8, Description}}. -address_v1_permitted() -> - rabbit_deprecated_features:is_permitted(amqp_address_v1). - -spec cap_credit(rabbit_queue_type:credit()) -> 0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX. cap_credit(DesiredCredit) -> min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX). +ensure_mc_cluster_compat(Mc) -> + IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1), + case IsEnabled of + true -> + Mc; + false -> + McEnv = #{message_containers_store_amqp_v1 => IsEnabled}, + %% other nodes in the cluster may not understand the new internal + %% amqp mc format - in this case we convert to AMQP legacy format + %% for compatibility + mc:convert(mc_amqpl, Mc, McEnv) + end. + format_status( #{state := #state{cfg = Cfg, outgoing_pending = OutgoingPending, @@ -3407,16 +3441,3 @@ format_status( permission_cache => PermissionCache, topic_permission_cache => TopicPermissionCache}, maps:update(state, State, Status). - -ensure_mc_cluster_compat(Mc) -> - IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1), - case IsEnabled of - true -> - Mc; - false -> - McEnv = #{message_containers_store_amqp_v1 => IsEnabled}, - %% other nodes in the cluster may not understand the new internal - %% amqp mc format - in this case we convert to AMQP legacy format - %% for compatibility - mc:convert(mc_amqpl, Mc, McEnv) - end. diff --git a/deps/rabbit/src/rabbit_uri.erl b/deps/rabbit/src/rabbit_uri.erl new file mode 100644 index 000000000000..f1e2d028753f --- /dev/null +++ b/deps/rabbit/src/rabbit_uri.erl @@ -0,0 +1,154 @@ +%% Copyright (c) 2016-2024, LoΓ―c Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% ------------------------------------------------------------------------- %% +%% This file is a partial copy of +%% https://github.com/ninenines/cowlib/blob/optimise-urldecode/src/cow_uri.erl +%% We use this copy because: +%% 1. uri_string:unquote/1 is lax: It doesn't validate that characters that are +%% required to be percent encoded are indeed percent encoded. In RabbitMQ, +%% we want to enforce that proper percent encoding is done by AMQP clients. +%% 2. uri_string:unquote/1 and cow_uri:urldecode/1 in cowlib v2.13.0 are both +%% slow because they allocate a new binary for the common case where no +%% character was percent encoded. +%% When a new cowlib version is released, we should make app rabbit depend on +%% app cowlib calling cow_uri:urldecode/1 and delete this file (rabbit_uri.erl). +%% ------------------------------------------------------------------------- %% + +-module(rabbit_uri). + +-export([urldecode/1]). + +-define(UNHEX(H, L), (?UNHEX(H) bsl 4 bor ?UNHEX(L))). + +-define(UNHEX(C), + case C of + $0 -> 0; + $1 -> 1; + $2 -> 2; + $3 -> 3; + $4 -> 4; + $5 -> 5; + $6 -> 6; + $7 -> 7; + $8 -> 8; + $9 -> 9; + $A -> 10; + $B -> 11; + $C -> 12; + $D -> 13; + $E -> 14; + $F -> 15; + $a -> 10; + $b -> 11; + $c -> 12; + $d -> 13; + $e -> 14; + $f -> 15 + end +). + +%% Decode a percent encoded string. (RFC3986 2.1) +%% +%% Inspiration for some of the optimisations done here come +%% from the new `json` module as it was in mid-2024. +%% +%% Possible input includes: +%% +%% * nothing encoded (no % character): +%% We want to return the binary as-is to avoid an allocation. +%% +%% * small number of encoded characters: +%% We can "skip" words of text. +%% +%% * mostly encoded characters (non-ascii languages) +%% We can decode characters in bulk. + +-define(IS_PLAIN(C), ( + (C =:= $!) orelse (C =:= $$) orelse (C =:= $&) orelse (C =:= $') orelse + (C =:= $() orelse (C =:= $)) orelse (C =:= $*) orelse (C =:= $+) orelse + (C =:= $,) orelse (C =:= $-) orelse (C =:= $.) orelse (C =:= $0) orelse + (C =:= $1) orelse (C =:= $2) orelse (C =:= $3) orelse (C =:= $4) orelse + (C =:= $5) orelse (C =:= $6) orelse (C =:= $7) orelse (C =:= $8) orelse + (C =:= $9) orelse (C =:= $:) orelse (C =:= $;) orelse (C =:= $=) orelse + (C =:= $@) orelse (C =:= $A) orelse (C =:= $B) orelse (C =:= $C) orelse + (C =:= $D) orelse (C =:= $E) orelse (C =:= $F) orelse (C =:= $G) orelse + (C =:= $H) orelse (C =:= $I) orelse (C =:= $J) orelse (C =:= $K) orelse + (C =:= $L) orelse (C =:= $M) orelse (C =:= $N) orelse (C =:= $O) orelse + (C =:= $P) orelse (C =:= $Q) orelse (C =:= $R) orelse (C =:= $S) orelse + (C =:= $T) orelse (C =:= $U) orelse (C =:= $V) orelse (C =:= $W) orelse + (C =:= $X) orelse (C =:= $Y) orelse (C =:= $Z) orelse (C =:= $_) orelse + (C =:= $a) orelse (C =:= $b) orelse (C =:= $c) orelse (C =:= $d) orelse + (C =:= $e) orelse (C =:= $f) orelse (C =:= $g) orelse (C =:= $h) orelse + (C =:= $i) orelse (C =:= $j) orelse (C =:= $k) orelse (C =:= $l) orelse + (C =:= $m) orelse (C =:= $n) orelse (C =:= $o) orelse (C =:= $p) orelse + (C =:= $q) orelse (C =:= $r) orelse (C =:= $s) orelse (C =:= $t) orelse + (C =:= $u) orelse (C =:= $v) orelse (C =:= $w) orelse (C =:= $x) orelse + (C =:= $y) orelse (C =:= $z) orelse (C =:= $~) +)). + +urldecode(Binary) -> + skip_dec(Binary, Binary, 0). + +%% This functions helps avoid a binary allocation when +%% there is nothing to decode. +skip_dec(Binary, Orig, Len) -> + case Binary of + <> + when ?IS_PLAIN(C1) andalso ?IS_PLAIN(C2) + andalso ?IS_PLAIN(C3) andalso ?IS_PLAIN(C4) -> + skip_dec(Rest, Orig, Len + 4); + _ -> + dec(Binary, [], Orig, 0, Len) + end. + +-dialyzer({no_improper_lists, [dec/5]}). +%% This clause helps speed up decoding of highly encoded values. +dec(<<$%, H1, L1, $%, H2, L2, $%, H3, L3, $%, H4, L4, Rest/bits>>, Acc, Orig, Skip, Len) -> + C1 = ?UNHEX(H1, L1), + C2 = ?UNHEX(H2, L2), + C3 = ?UNHEX(H3, L3), + C4 = ?UNHEX(H4, L4), + case Len of + 0 -> + dec(Rest, [Acc|<>], Orig, Skip + 12, 0); + _ -> + Part = binary_part(Orig, Skip, Len), + dec(Rest, [Acc, Part|<>], Orig, Skip + Len + 12, 0) + end; +dec(<<$%, H, L, Rest/bits>>, Acc, Orig, Skip, Len) -> + C = ?UNHEX(H, L), + case Len of + 0 -> + dec(Rest, [Acc|<>], Orig, Skip + 3, 0); + _ -> + Part = binary_part(Orig, Skip, Len), + dec(Rest, [Acc, Part|<>], Orig, Skip + Len + 3, 0) + end; +%% This clause helps speed up decoding of barely encoded values. +dec(<>, Acc, Orig, Skip, Len) + when ?IS_PLAIN(C1) andalso ?IS_PLAIN(C2) + andalso ?IS_PLAIN(C3) andalso ?IS_PLAIN(C4) -> + dec(Rest, Acc, Orig, Skip, Len + 4); +dec(<>, Acc, Orig, Skip, Len) when ?IS_PLAIN(C) -> + dec(Rest, Acc, Orig, Skip, Len + 1); +dec(<<>>, _, Orig, 0, _) -> + Orig; +dec(<<>>, Acc, _, _, 0) -> + iolist_to_binary(Acc); +dec(<<>>, Acc, Orig, Skip, Len) -> + Part = binary_part(Orig, Skip, Len), + iolist_to_binary([Acc|Part]); +dec(_, _, Orig, Skip, Len) -> + error({invalid_byte, binary:at(Orig, Skip + Len)}). diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index eb7f8c98935e..b6c41a1c722e 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -22,7 +22,7 @@ all() -> [ {group, v1_permitted}, - {group, v2} + {group, v1_denied} ]. groups() -> @@ -30,7 +30,7 @@ groups() -> {v1_permitted, [shuffle], common_tests() }, - {v2, [shuffle], + {v1_denied, [shuffle], [ target_queue_absent, source_queue_absent, @@ -70,7 +70,7 @@ end_per_suite(Config) -> init_per_group(Group, Config0) -> PermitV1 = case Group of v1_permitted -> true; - v2 -> false + v1_denied -> false end, Config = rabbit_ct_helpers:merge_app_env( Config0, @@ -97,14 +97,14 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). %% Test v2 target address -%% /exchange/:exchange/key/:routing-key +%% /e/:exchange/:routing-key target_exchange_routing_key(Config) -> XName = <<"πŸ‘‰"/utf8>>, RKey = <<"πŸ—οΈ"/utf8>>, target_exchange_routing_key0(XName, RKey, Config). %% Test v2 target address -%% /exchange/:exchange/key/:routing-key +%% /e/:exchange/:routing-key %% where both :exchange and :routing-key contains a "/" character. target_exchange_routing_key_with_slash(Config) -> XName = <<"my/exchange">>, @@ -112,14 +112,14 @@ target_exchange_routing_key_with_slash(Config) -> target_exchange_routing_key0(XName, RKey, Config). target_exchange_routing_key0(XName, RKey, Config) -> - TargetAddr = <<"/exchange/", XName/binary, "/key/", RKey/binary>>, + TargetAddr = rabbitmq_amqp_address:exchange(XName, RKey), QName = atom_to_binary(?FUNCTION_NAME), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, RKey, #{}), - SrcAddr = <<"/queue/", QName/binary>>, + SrcAddr = rabbitmq_amqp_address:queue(QName), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), @@ -141,17 +141,17 @@ target_exchange_routing_key0(XName, RKey, Config) -> ok = cleanup(Init). %% Test v2 target address -%% /exchange/:exchange/key/ +%% /e/:exchange/ %% Routing key is empty. target_exchange_routing_key_empty(Config) -> XName = <<"amq.fanout">>, + TargetAddr = rabbitmq_amqp_address:exchange(XName, <<>>), QName = atom_to_binary(?FUNCTION_NAME), - TargetAddr = <<"/exchange/", XName/binary, "/key/">>, Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, <<"ignored">>, #{}), - SrcAddr = <<"/queue/", QName/binary>>, + SrcAddr = rabbitmq_amqp_address:queue(QName), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), @@ -167,17 +167,17 @@ target_exchange_routing_key_empty(Config) -> ok = cleanup(Init). %% Test v2 target address -%% /exchange/:exchange +%% /e/:exchange %% Routing key is empty. target_exchange(Config) -> XName = <<"amq.fanout">>, - TargetAddr = <<"/exchange/", XName/binary>>, + TargetAddr = rabbitmq_amqp_address:exchange(XName), QName = atom_to_binary(?FUNCTION_NAME), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, <<"ignored">>, #{}), - SrcAddr = <<"/queue/", QName/binary>>, + SrcAddr = rabbitmq_amqp_address:queue(QName), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, SrcAddr), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, TargetAddr), @@ -193,11 +193,11 @@ target_exchange(Config) -> ok = cleanup(Init). %% Test v2 target address -%% /exchange/:exchange +%% /e/:exchange %% where the target exchange does not exist. target_exchange_absent(Config) -> XName = <<"🎈"/utf8>>, - TargetAddr = <<"/exchange/", XName/binary>>, + TargetAddr = rabbitmq_amqp_address:exchange(XName), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -220,20 +220,20 @@ target_exchange_absent(Config) -> ok = amqp10_client:close_connection(Connection). %% Test v2 target and source address -%% /queue/:queue +%% /q/:queue queue(Config) -> QName = <<"🎈"/utf8>>, queue0(QName, Config). %% Test v2 target and source address -%% /queue/:queue +%% /q/:queue %% where :queue contains a "/" character. queue_with_slash(Config) -> QName = <<"my/queue">>, queue0(QName, Config). queue0(QName, Config) -> - Addr = <<"/queue/", QName/binary>>, + Addr = rabbitmq_amqp_address:queue(QName), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), @@ -252,11 +252,11 @@ queue0(QName, Config) -> ok = cleanup(Init). %% Test v2 target address -%% /queue/:queue +%% /q/:queue %% where the target queue does not exist. target_queue_absent(Config) -> QName = <<"🎈"/utf8>>, - TargetAddr = <<"/queue/", QName/binary>>, + TargetAddr = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -279,15 +279,15 @@ target_queue_absent(Config) -> ok = amqp10_client:close_connection(Connection). %% Test v2 target address 'null' and 'to' -%% /exchange/:exchange/key/:routing-key +%% /e/:exchange/:routing-key %% with varying routing keys. target_per_message_exchange_routing_key(Config) -> QName = atom_to_binary(?FUNCTION_NAME), DirectX = <<"amq.direct">>, RKey1 = <<"πŸ—οΈ1"/utf8>>, RKey2 = <<"πŸ—οΈ2"/utf8>>, - To1 = <<"/exchange/", DirectX/binary, "/key/", RKey1/binary>>, - To2 = <<"/exchange/", DirectX/binary, "/key/", RKey2/binary>>, + To1 = rabbitmq_amqp_address:exchange(DirectX, RKey1), + To2 = rabbitmq_amqp_address:exchange(DirectX, RKey2), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), @@ -315,13 +315,13 @@ target_per_message_exchange_routing_key(Config) -> ok = cleanup(Init). %% Test v2 target address 'null' and 'to' -%% /exchange/:exchange +%% /e/:exchange %% with varying exchanges. target_per_message_exchange(Config) -> XFanout = <<"amq.fanout">>, XHeaders = <<"amq.headers">>, - To1 = <<"/exchange/", XFanout/binary>>, - To2 = <<"/exchange/", XHeaders/binary>>, + To1 = rabbitmq_amqp_address:exchange(XFanout), + To2 = rabbitmq_amqp_address:exchange(XHeaders), QName = atom_to_binary(?FUNCTION_NAME), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), @@ -349,14 +349,14 @@ target_per_message_exchange(Config) -> ok = cleanup(Init). %% Test v2 target address 'null' and 'to' -%% /queue/:queue +%% /q/:queue target_per_message_queue(Config) -> Q1 = <<"q1">>, Q2 = <<"q2">>, Q3 = <<"q3">>, - To1 = <<"/queue/", Q1/binary>>, - To2 = <<"/queue/", Q2/binary>>, - To3 = <<"/queue/", Q3/binary>>, + To1 = rabbitmq_amqp_address:queue(Q1), + To2 = rabbitmq_amqp_address:queue(Q2), + To3 = rabbitmq_amqp_address:queue(Q3), Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, Q1, #{}), @@ -414,16 +414,37 @@ bad_v2_addresses() -> <<"myqueue">>, <<"/queue">>, %% bad v2 target addresses + <<>>, + <<0>>, + <<"/">>, + <<"//">>, + <<"/q">>, + <<"/q/">>, <<"/queue/">>, + <<"/e">>, + %% default exchange in v2 target address is disallowed + <<"/e/">>, + <<"/e//">>, + <<"/e//mykey">>, + <<"/e/amq.default">>, + <<"/e/amq.default/">>, + <<"/e/amq.default/mykey">>, <<"/ex/βœ‹"/utf8>>, <<"/exchange">>, - %% default exchange in v2 target address is disallowed <<"/exchange/">>, <<"/exchange/amq.default">>, <<"/exchange//key/">>, <<"/exchange//key/mykey">>, <<"/exchange/amq.default/key/">>, - <<"/exchange/amq.default/key/mykey">> + <<"/exchange/amq.default/key/mykey">>, + %% The following addresses should be percent encoded, but aren't. + <<"/q/missing%encoding">>, + <<"/q/missing/encoding">>, + <<"/q/βœ‹"/utf8>>, + <<"/e/missing%encoding">>, + <<"/e/missing/encoding/routingkey">>, + <<"/e/exchange/missing%encoding">>, + <<"/e/βœ‹"/utf8>> ]. %% Test v2 target address 'null' with an invalid 'to' addresses. @@ -458,7 +479,7 @@ target_per_message_bad_to_address0(Address, Config) -> target_per_message_exchange_absent(Config) -> Init = {_, LinkPair = #link_pair{session = Session}} = init(Config), XName = <<"🎈"/utf8>>, - Address = <<"/exchange/", XName/binary>>, + Address = rabbitmq_amqp_address:exchange(XName), ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{}), {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, null), ok = wait_for_credit(Sender), @@ -514,11 +535,11 @@ target_bad_address0(TargetAddress, Config) -> ok = amqp10_client:close_connection(Connection). %% Test v2 source address -%% /queue/:queue +%% /q/:queue %% where the source queue does not exist. source_queue_absent(Config) -> QName = <<"🎈"/utf8>>, - SourceAddr = <<"/queue/", QName/binary>>, + SourceAddr = rabbitmq_amqp_address:queue(QName), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index 69f44bedf818..0ff70bf0c520 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -413,7 +413,7 @@ v1_attach_target_internal_exchange(Config) -> attach_source_queue(Config) -> {Conn, Session, LinkPair} = init_pair(Config), QName = <<"🍿"/utf8>>, - Address = <<"/queue/", QName/binary>>, + Address = rabbitmq_amqp_address:queue(QName), %% missing read permission to queue ok = set_permissions(Config, QName, <<>>, <<>>), @@ -433,8 +433,8 @@ attach_source_queue(Config) -> attach_target_exchange(Config) -> XName = <<"amq.fanout">>, - Address1 = <<"/exchange/", XName/binary>>, - Address2 = <<"/exchange/", XName/binary, "/key/some-key", XName/binary>>, + Address1 = rabbitmq_amqp_address:exchange(XName), + Address2 = rabbitmq_amqp_address:exchange(XName, <<"some-key">>), OpnConf = connection_config(Config), {ok, Connection} = amqp10_client:open_connection(OpnConf), @@ -457,13 +457,14 @@ attach_target_exchange(Config) -> ok = amqp10_client:close_connection(Connection). attach_target_topic_exchange(Config) -> - TargetAddress = <<"/exchange/amq.topic/key/test vhost.test user.a.b">>, + TargetAddress = rabbitmq_amqp_address:exchange( + <<"amq.topic">>, <<"test vhost.test user.a.b">>), ok = send_to_topic(TargetAddress, Config). attach_target_queue(Config) -> {Conn, Session, LinkPair} = init_pair(Config), QName = <<"🍿"/utf8>>, - Address = <<"/queue/", QName/binary>>, + Address = rabbitmq_amqp_address:queue(QName), %% missing write permission to default exchange ok = set_permissions(Config, QName, <<>>, <<>>), @@ -480,8 +481,8 @@ attach_target_queue(Config) -> target_per_message_exchange(Config) -> TargetAddress = null, - To1 = <<"/exchange/amq.fanout">>, - To2 = <<"/queue/q1">>, + To1 = rabbitmq_amqp_address:exchange(<<"amq.fanout">>), + To2 = rabbitmq_amqp_address:queue(<<"q1">>), %% missing write permission to default exchange ok = set_permissions(Config, <<>>, <<"amq.fanout">>, <<>>), @@ -516,7 +517,7 @@ target_per_message_internal_exchange(Config) -> XName = <<"my internal exchange">>, XProps = #{internal => true}, TargetAddress = null, - To = <<"/exchange/", XName/binary>>, + To = rabbitmq_amqp_address:exchange(XName), ok = set_permissions(Config, XName, XName, <<>>), {Conn1, Session1, LinkPair1} = init_pair(Config), @@ -541,8 +542,8 @@ target_per_message_internal_exchange(Config) -> target_per_message_topic(Config) -> TargetAddress = null, - To1 = <<"/exchange/amq.topic/key/.a">>, - To2 = <<"/exchange/amq.topic/key/.a.b">>, + To1 = rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<".a">>), + To2 = rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<".a.b">>), User = ?config(test_user, Config), Vhost = ?config(test_vhost, Config), ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, Vhost), diff --git a/deps/rabbitmq_amqp_client/app.bzl b/deps/rabbitmq_amqp_client/app.bzl index 6f3e3c4c0446..d80a6dafe4f5 100644 --- a/deps/rabbitmq_amqp_client/app.bzl +++ b/deps/rabbitmq_amqp_client/app.bzl @@ -8,7 +8,7 @@ def all_beam_files(name = "all_beam_files"): ) erlang_bytecode( name = "other_beam", - srcs = ["src/rabbitmq_amqp_client.erl"], + srcs = ["src/rabbitmq_amqp_address.erl", "src/rabbitmq_amqp_client.erl"], hdrs = [":public_and_private_hdrs"], app_name = "rabbitmq_amqp_client", dest = "ebin", @@ -19,7 +19,7 @@ def all_beam_files(name = "all_beam_files"): def all_srcs(name = "all_srcs"): filegroup( name = "srcs", - srcs = ["src/rabbitmq_amqp_client.erl"], + srcs = ["src/rabbitmq_amqp_address.erl", "src/rabbitmq_amqp_client.erl"], ) filegroup(name = "private_hdrs") filegroup( @@ -47,7 +47,7 @@ def all_test_beam_files(name = "all_test_beam_files"): erlang_bytecode( name = "test_other_beam", testonly = True, - srcs = ["src/rabbitmq_amqp_client.erl"], + srcs = ["src/rabbitmq_amqp_address.erl", "src/rabbitmq_amqp_client.erl"], hdrs = [":public_and_private_hdrs"], app_name = "rabbitmq_amqp_client", dest = "test", diff --git a/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl new file mode 100644 index 000000000000..a0c07d4fde57 --- /dev/null +++ b/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_address.erl @@ -0,0 +1,30 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term β€œBroadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +-module(rabbitmq_amqp_address). + +-export[exchange/1, + exchange/2, + queue/1]. + +-spec exchange(unicode:unicode_binary()) -> + unicode:unicode_binary(). +exchange(ExchangeName) -> + ExchangeNameQuoted = uri_string:quote(ExchangeName), + <<"/e/", ExchangeNameQuoted/binary>>. + +-spec exchange(unicode:unicode_binary(), unicode:unicode_binary()) -> + unicode:unicode_binary(). +exchange(ExchangeName, RoutingKey) -> + ExchangeNameQuoted = uri_string:quote(ExchangeName), + RoutingKeyQuoted = uri_string:quote(RoutingKey), + <<"/e/", ExchangeNameQuoted/binary, "/", RoutingKeyQuoted/binary>>. + +-spec queue(unicode:unicode_binary()) -> + unicode:unicode_binary(). +queue(QueueName) -> + QueueNameQuoted = uri_string:quote(QueueName), + <<"/q/", QueueNameQuoted/binary>>. diff --git a/deps/rabbitmq_mqtt/src/mc_mqtt.erl b/deps/rabbitmq_mqtt/src/mc_mqtt.erl index b6304bf50780..802efc6576e9 100644 --- a/deps/rabbitmq_mqtt/src/mc_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/mc_mqtt.erl @@ -87,12 +87,14 @@ convert_from(mc_amqp, Sections, Env) -> #'v1_0.properties'{reply_to = {utf8, Address}} -> MqttX = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE), case Address of - <<"/exchange/", - MqttX:(byte_size(MqttX))/binary, - "/key/", - RoutingKey/binary>> -> - MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(RoutingKey), - #{'Response-Topic' => MqttTopic}; + <<"/e/", MqttX:(byte_size(MqttX))/binary, "/", RoutingKeyQuoted/binary>> -> + try rabbit_uri:urldecode(RoutingKeyQuoted) of + RoutingKey -> + MqttTopic = rabbit_mqtt_util:amqp_to_mqtt(RoutingKey), + #{'Response-Topic' => MqttTopic} + catch error:_ -> + #{} + end; _ -> #{} end; @@ -257,7 +259,11 @@ convert_to(mc_amqp, #mqtt_msg{qos = Qos, #{'Response-Topic' := MqttTopic} -> Exchange = maps:get(mqtt_x, Env, ?DEFAULT_MQTT_EXCHANGE), Topic = rabbit_mqtt_util:mqtt_to_amqp(MqttTopic), - Address = <<"/exchange/", Exchange/binary, "/key/", Topic/binary>>, + TopicQuoted = uri_string:quote(Topic), + %% We assume here that Exchange doesn't contain characters + %% that need to be quoted. This is a reasonable assumption + %% given that amq.topic is the default MQTT topic exchange. + Address = <<"/e/", Exchange/binary, "/", TopicQuoted/binary>>, {utf8, Address}; _ -> undefined diff --git a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl index d72d1fed5903..9ff5d0f61219 100644 --- a/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl @@ -236,17 +236,32 @@ amqp_to_mqtt_reply_to(_Config) -> Val = amqp_value({utf8, <<"hey">>}), Key = mqtt_x, Env = #{Key => <<"mqtt-topic-exchange">>}, - AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/mqtt-topic-exchange/key/my.routing.key">>}}, + + AmqpProps1 = #'v1_0.properties'{reply_to = {utf8, <<"/e/mqtt-topic-exchange/my.routing.key">>}}, #mqtt_msg{props = Props1} = amqp_to_mqtt([AmqpProps1, Val], Env), ?assertEqual({ok, <<"my/routing/key">>}, maps:find('Response-Topic', Props1)), - AmqpProps2 = #'v1_0.properties'{reply_to = {utf8, <<"/exchange/NON-mqtt-topic-exchange/key/my.routing.key">>}}, + AmqpProps2 = #'v1_0.properties'{reply_to = {utf8, <<"/e/NON-mqtt-topic-exchange/my.routing.key">>}}, #mqtt_msg{props = Props2} = amqp_to_mqtt([AmqpProps2, Val]), ?assertEqual(error, maps:find('Response-Topic', Props2)), - ok. + RoutingKey = <<"my.sp%$@cial.routing.key">>, + %% The AMQP client must percent encode the AMQP reply_to address URI. We expect the + %% AMQP -> MQTT conversion to percent decode because an MQTT response topic is not percent encoded. + RoutingKeyQuoted = uri_string:quote(RoutingKey), + AmqpProps3 = #'v1_0.properties'{reply_to = {utf8, <<"/e/mqtt-topic-exchange/", RoutingKeyQuoted/binary>>}}, + #mqtt_msg{props = Props3} = amqp_to_mqtt([AmqpProps3, Val], Env), + ?assertEqual({ok, <<"my/sp%$@cial/routing/key">>}, + maps:find('Response-Topic', Props3)), + + %% If the AMQP client did not percent encode the AMQP reply_to address URI as required, + %% then the reply_to should be ignored by the conversion. + AmqpProps4 = #'v1_0.properties'{reply_to = {utf8, <<"/e/mqtt-topic-exchange/", RoutingKey/binary>>}}, + #mqtt_msg{props = Props4} = amqp_to_mqtt([AmqpProps4, Val], Env), + ?assertEqual(error, + maps:find('Response-Topic', Props4)). amqp_to_mqtt_footer(_Config) -> Body = <<"hey">>, diff --git a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl index 57fad5e727b2..4c8ba680b23e 100644 --- a/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl @@ -186,12 +186,13 @@ mqtt_amqp_mqtt(Config) -> ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, <<"topic.1">>, #{}), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), {ok, Receiver} = amqp10_client:attach_receiver_link( - Session1, <<"test-receiver">>, <<"/queue/", QName/binary>>, + Session1, <<"test-receiver">>, + rabbitmq_amqp_address:queue(QName), unsettled, configuration), %% MQTT 5.0 to AMQP 1.0 C = connect(ClientId, Config), - MqttResponseTopic = <<"response/topic">>, + MqttResponseTopic = <<"response/topic/πŸ₯•"/utf8>>, {ok, _, [1]} = emqtt:subscribe(C, #{'Subscription-Identifier' => 999}, [{MqttResponseTopic, [{qos, 1}]}]), Correlation = <<"some correlation ID">>, @@ -233,7 +234,9 @@ mqtt_amqp_mqtt(Config) -> #{correlation_id := Correlation, content_type := ContentType, reply_to := ReplyToAddress} = amqp10_msg:properties(Msg1), - ?assertEqual(<<"/exchange/amq.topic/key/response.topic">>, ReplyToAddress), + ExpectedReplyToAddress = rabbitmq_amqp_address:exchange( + <<"amq.topic">>, <<"response.topic.πŸ₯•"/utf8>>), + ?assertEqual(ExpectedReplyToAddress, ReplyToAddress), ?assertEqual(RequestPayload, amqp10_msg:body_bin(Msg1)), @@ -303,12 +306,16 @@ amqp_mqtt_amqp(Config) -> {ok, Session} = amqp10_client:begin_session(Connection), {ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"pair">>), {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), - ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, <<"t.2">>, #{}), + ok = rabbitmq_amqp_client:bind_queue(LinkPair, QName, <<"amq.topic">>, <<"[.]">>, #{}), ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), - {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, <<"/queue/", QName/binary>>), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, rabbitmq_amqp_address:queue(QName)), %% AMQP 1.0 to MQTT 5.0 - {ok, Sender} = amqp10_client:attach_sender_link(Session, <<"sender">>, <<"/exchange/amq.topic/key/t.1">>), + {ok, Sender} = amqp10_client:attach_sender_link( + Session, + <<"sender">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"t.1">>)), receive {amqp10_event, {link, Sender, credited}} -> ok after 2000 -> ct:fail(credited_timeout) end, @@ -318,11 +325,11 @@ amqp_mqtt_amqp(Config) -> #{durable => true}, amqp10_msg:set_properties( #{correlation_id => Correlation, - reply_to => <<"/exchange/amq.topic/key/t.2">>}, + reply_to => rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"[.]">>)}, amqp10_msg:new(<<>>, RequestBody, true))), ok = amqp10_client:send_msg(Sender, Msg1), - ResponseTopic = <<"t/2">>, + ResponseTopic = <<"[/]">>, receive {publish, MqttMsg} -> ct:pal("Received MQTT message:~n~p", [MqttMsg]), #{client_pid := C, @@ -385,7 +392,9 @@ amqp_mqtt(Qos, Config) -> {ok, Session} = amqp10_client:begin_session(Connection), {ok, Sender} = amqp10_client:attach_sender_link( - Session, <<"sender">>, <<"/exchange/amq.topic/key/my.topic">>), + Session, + <<"sender">>, + rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"my.topic">>)), receive {amqp10_event, {link, Sender, credited}} -> ok after 2000 -> ct:fail(credited_timeout) end, @@ -638,12 +647,13 @@ mqtt_stream(Config) -> <<"x-routing-key">> => <<"my.topic">>, <<"x-key">> => <<"val">>}, amqp10_msg:message_annotations(Msg)), - ?assertEqual(#{correlation_id => Correlation, - content_type => ContentType, - %% We expect that reply_to contains a valid AMQP 1.0 address, - %% and that the topic format got translated from MQTT to AMQP 0.9.1. - reply_to => <<"/exchange/amq.topic/key/response.topic">>}, - amqp10_msg:properties(Msg)), + ?assertEqual( + #{correlation_id => Correlation, + content_type => ContentType, + %% We expect that reply_to contains a valid AMQP 1.0 address, + %% and that the topic format got translated from MQTT to AMQP 0.9.1. + reply_to => rabbitmq_amqp_address:exchange(<<"amq.topic">>, <<"response.topic">>)}, + amqp10_msg:properties(Msg)), ?assertEqual(#{<<"rabbitπŸ‡"/utf8>> => <<"carrotπŸ₯•"/utf8>>, <<"key">> => <<"val">>}, amqp10_msg:application_properties(Msg)), diff --git a/moduleindex.yaml b/moduleindex.yaml index 2eb563069e53..e847db8bdbb6 100755 --- a/moduleindex.yaml +++ b/moduleindex.yaml @@ -723,6 +723,7 @@ rabbit: - rabbit_tracking - rabbit_tracking_store - rabbit_upgrade_preparation +- rabbit_uri - rabbit_variable_queue - rabbit_version - rabbit_vhost @@ -807,6 +808,7 @@ rabbit_common: - worker_pool_sup - worker_pool_worker rabbitmq_amqp_client: +- rabbitmq_amqp_address - rabbitmq_amqp_client rabbitmq_amqp1_0: - rabbitmq_amqp1_0_noop