diff --git a/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTreeTest.java b/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTreeTest.java index 7431792a1..a5fb107a8 100644 --- a/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTreeTest.java +++ b/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTreeTest.java @@ -36,20 +36,60 @@ class MqttSubscriptionFlowTreeTest extends MqttSubscriptionFlowsTest { @ParameterizedTest @CsvSource({ + // split single level before and after "unsubscribe, test/topic1, test/topic2, test/topic3, test/topic1, test/topic2, test/topic3", + // split single level before, single level wildcard after "unsubscribe, test/+, test/topic2, test/topic3, test/topic1, test/topic2, test/topic3", + // split single level before, single level wildcard after, fuse different order + "unsubscribe, test/topic1, test/topic2, test/+, test/topic1, test/topic2, test/topic3", + // split multiple levels before, single level after "unsubscribe, test/topic/filter1, test/topic/filter2, test/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3", + // split multiple levels before, single level wildcard after "unsubscribe, test/topic/+, test/topic/filter2, test/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3", + // split multiple levels before, single level wildcard after, fuse different order + "unsubscribe, test/topic/filter1, test/topic/filter2, test/topic/+, test/topic/filter1, test/topic/filter2, test/topic/filter3", + // split single level before, multiple levels after "unsubscribe, test/topic1/filter, test/topic2/filter, test/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter", + // split single level before, single level wildcard with multiple levels after "unsubscribe, test/+/filter, test/topic2/filter, test/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter", + // split single level before, single level wildcard with multiple levels after, fuse different order + "unsubscribe, test/topic1/filter, test/topic2/filter, test/+/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter", + // split multiple levels before, multiple levels after + "unsubscribe, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc", + // split multi levels before, single level wildcard with multiple levels after + "unsubscribe, test/topic/+/abc, test/topic/filter2/abc, test/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc", + // split multi levels before, single level wildcard with multiple levels after, fuse different order + "unsubscribe, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/+/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc", + // split single level wildcard before, single level after + "unsubscribe, +/topic1, +/topic2, +/topic3, test/topic1, test/topic2, test/topic3", + // split single level wildcard before, multiple levels after + "unsubscribe, +/topic1/filter, +/topic2/filter, +/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter", + // split single level wildcard with multiple levels before, single level after + "unsubscribe, +/topic/filter1, +/topic/filter2, +/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3", + // split single level wildcard with multiple levels before, multiple levels after + "unsubscribe, +/topic/filter1/abc, +/topic/filter2/abc, +/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc", + // split linear chain "unsubscribe, test/topic/filter, test/topic//filter, test/topic///filter, test/topic/filter, test/topic//filter, test/topic///filter", + // split linear chain, do not fuse + "unsubscribe, test/topic/filter, test/topic//filter, test/topic///filter, test/topic///filter, test/topic//filter, test/topic/filter", "remove, test/topic1, test/topic2, test/topic3, test/topic1, test/topic2, test/topic3", "remove, test/+, test/topic2, test/topic3, test/topic1, test/topic2, test/topic3", + "remove, test/topic1, test/topic2, test/+, test/topic1, test/topic2, test/topic3", "remove, test/topic/filter1, test/topic/filter2, test/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3", "remove, test/topic/+, test/topic/filter2, test/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3", + "remove, test/topic/filter1, test/topic/filter2, test/topic/+, test/topic/filter1, test/topic/filter2, test/topic/filter3", "remove, test/topic1/filter, test/topic2/filter, test/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter", "remove, test/+/filter, test/topic2/filter, test/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter", + "remove, test/topic1/filter, test/topic2/filter, test/+/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter", + "remove, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc", + "remove, test/topic/+/abc, test/topic/filter2/abc, test/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc", + "remove, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/+/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc", + "remove, +/filter1, +/filter2, +/filter3, topic/filter1, topic/filter2, topic/filter3", + "remove, +/topic1/filter, +/topic2/filter, +/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter", + "remove, +/topic/filter1, +/topic/filter2, +/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3", + "remove, +/topic/filter1/abc, +/topic/filter2/abc, +/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc", "remove, test/topic/filter, test/topic//filter, test/topic///filter, test/topic/filter, test/topic//filter, test/topic///filter", + "remove, test/topic/filter, test/topic//filter, test/topic///filter, test/topic///filter, test/topic//filter, test/topic/filter", }) void branching_compaction( final @NotNull String compactOperation, final @NotNull String filter1, final @NotNull String filter2, diff --git a/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowsTest.java b/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowsTest.java index a9904ec8b..5a9202a4a 100644 --- a/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowsTest.java +++ b/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowsTest.java @@ -23,6 +23,7 @@ import com.hivemq.client.internal.util.collections.HandleList; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.converter.ArgumentConversionException; import org.junit.jupiter.params.converter.ConvertWith; @@ -32,8 +33,7 @@ import java.util.function.Supplier; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; /** * @author Silvio Giebl @@ -270,6 +270,23 @@ void cancel_notPresentFlows_areIgnored(final @NotNull String topic, final @NotNu assertEquals(ImmutableSet.of(flow1), ImmutableSet.copyOf(matching)); } + @Test + void cancel_partiallyUnsubscribedFlow() { + final MqttSubscribedPublishFlow flow = mockSubscriptionFlow("test/topic(2)"); + flows.subscribe(MqttTopicFilterImpl.of("test/topic"), flow); + flows.subscribe(MqttTopicFilterImpl.of("test/topic2"), flow); + + flows.unsubscribe(MqttTopicFilterImpl.of("test/topic"), null); + flows.cancel(flow); + + final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows(); + flows.findMatching(MqttTopicImpl.of("test/topic"), matching); + assertFalse(matching.subscriptionFound); + flows.findMatching(MqttTopicImpl.of("test/topic2"), matching); + assertTrue(matching.subscriptionFound); + assertTrue(matching.isEmpty()); + } + @ParameterizedTest @CsvSource({ "1/a, 1/a, 2/a, 2/a", "1/a, 1/+, 2/a, 2/+", "1/a, 1/#, 2/a, 2/#", "1/a/b, 1/a/b, 2/a/b, 2/a/b", @@ -364,6 +381,57 @@ void remove_doesNotUnsubscribe_noFlow(final @NotNull String topic, final @NotNul assertTrue(matching.isEmpty()); } + @ParameterizedTest + @CsvSource({"a/b, a/b/c, +/b/c, +/+/+", "a/b/c/d, a/b/c/d/e, +/b//d/ec, +/+/+/+/+"}) + void findMatching_matchingMultipleButNotAllLevels( + final @NotNull String topic, final @NotNull String filter1, final @NotNull String filter2, + final @NotNull String filter3) { + + flows.subscribe(MqttTopicFilterImpl.of(filter1), null); + flows.subscribe(MqttTopicFilterImpl.of(filter2), null); + flows.subscribe(MqttTopicFilterImpl.of(filter3), null); + + final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows(); + flows.findMatching(MqttTopicImpl.of(topic), matching); + assertFalse(matching.subscriptionFound); + assertTrue(matching.isEmpty()); + } + + @Test + void clear() { + final MqttSubscribedPublishFlow flow1 = mockSubscriptionFlow("test/topic/filter"); + final MqttSubscribedPublishFlow flow2 = mockSubscriptionFlow("test2/topic/filter"); + final MqttSubscribedPublishFlow flow3 = mockSubscriptionFlow("test/topic2/filter"); + final MqttSubscribedPublishFlow flow4 = mockSubscriptionFlow("test/topic/filter2"); + final MqttSubscribedPublishFlow flow5 = mockSubscriptionFlow("+/topic"); + final MqttSubscribedPublishFlow flow6 = mockSubscriptionFlow("topic/#"); + flows.subscribe(MqttTopicFilterImpl.of(flow1.toString()), flow1); + flows.subscribe(MqttTopicFilterImpl.of(flow2.toString()), flow2); + flows.subscribe(MqttTopicFilterImpl.of(flow3.toString()), flow3); + flows.subscribe(MqttTopicFilterImpl.of(flow4.toString()), flow4); + flows.subscribe(MqttTopicFilterImpl.of(flow5.toString()), flow5); + flows.subscribe(MqttTopicFilterImpl.of(flow6.toString()), flow6); + + final Exception cause = new Exception("test"); + flows.clear(cause); + verify(flow1).onError(cause); + verify(flow2).onError(cause); + verify(flow3).onError(cause); + verify(flow4).onError(cause); + verify(flow5).onError(cause); + verify(flow6).onError(cause); + + final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows(); + flows.findMatching(MqttTopicImpl.of("test/topic"), matching); + flows.findMatching(MqttTopicImpl.of("test/topic/filter"), matching); + flows.findMatching(MqttTopicImpl.of("test2/topic/filter"), matching); + flows.findMatching(MqttTopicImpl.of("test/topic2/filter"), matching); + flows.findMatching(MqttTopicImpl.of("test/topic/filter2"), matching); + flows.findMatching(MqttTopicImpl.of("abc/topic"), matching); + flows.findMatching(MqttTopicImpl.of("topic/abc"), matching); + assertFalse(matching.subscriptionFound); + } + @NotNull private static MqttSubscribedPublishFlow mockSubscriptionFlow(final @NotNull String name) { final MqttSubscribedPublishFlow flow = mock(MqttSubscribedPublishFlow.class);