diff --git a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicIterator.java b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicIterator.java
new file mode 100644
index 000000000..83864eff4
--- /dev/null
+++ b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicIterator.java
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2018 dc-square and the HiveMQ MQTT Client Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.hivemq.client.internal.mqtt.datatypes;
+
+import com.hivemq.client.internal.util.ByteArrayUtil;
+import com.hivemq.client.mqtt.datatypes.MqttTopic;
+import com.hivemq.client.mqtt.datatypes.MqttTopicFilter;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator for a topic or topic filter.
+ *
+ * equals and hashCode match the current level.
+ *
+ * @author Silvio Giebl
+ */
+public class MqttTopicIterator extends MqttTopicLevel {
+
+ public static @NotNull MqttTopicIterator of(final @NotNull MqttTopicImpl topic) {
+ final byte[] binary = topic.toBinary();
+ return new MqttTopicIterator(binary, -1, -1, binary.length);
+ }
+
+ public static @NotNull MqttTopicIterator of(final @NotNull MqttTopicFilterImpl topicFilter) {
+ final byte[] binary = topicFilter.toBinary();
+ final int start = topicFilter.getFilterByteStart() - 1;
+ return new MqttTopicIterator(
+ binary, start, start, topicFilter.containsMultiLevelWildcard() ? (binary.length - 2) : binary.length);
+ }
+
+ private int start;
+ private int end;
+ private final int allEnd;
+
+ private MqttTopicIterator(final @NotNull byte[] array, final int start, final int end, final int allEnd) {
+ super(array);
+ this.start = start;
+ this.end = end;
+ this.allEnd = allEnd;
+ }
+
+ @Override
+ protected int getStart() {
+ return start;
+ }
+
+ @Override
+ protected int getEnd() {
+ return end;
+ }
+
+ public boolean hasNext() {
+ return end != allEnd;
+ }
+
+ public boolean hasMultiLevelWildcard() {
+ return allEnd != array.length;
+ }
+
+ public @NotNull MqttTopicIterator fork() {
+ return new MqttTopicIterator(array, start, end, allEnd);
+ }
+
+ public @NotNull MqttTopicLevel next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ start = end + 1;
+ end = ByteArrayUtil.indexOf(array, start, (byte) MqttTopic.TOPIC_LEVEL_SEPARATOR);
+ return this;
+ }
+
+ @Override
+ public @NotNull MqttTopicLevel trim() {
+ if (!hasNext()) {
+ return MqttTopicLevel.of(array, start, end);
+ }
+ final int start = this.start;
+ final int end = this.end;
+ this.start = this.end = allEnd;
+ return new MqttTopicLevels(Arrays.copyOfRange(array, start, allEnd), end - start);
+ }
+
+ public boolean forwardIfEqual(final @NotNull MqttTopicLevels levels) {
+ final byte[] levelsArray = levels.getArray();
+ final int levelsEnd = levels.getEnd();
+ final int to = end + levelsArray.length - levelsEnd;
+ if ((to <= allEnd) && ((to == allEnd) || (array[to] == MqttTopic.TOPIC_LEVEL_SEPARATOR)) &&
+ ByteArrayUtil.equals(array, end + 1, to, levelsArray, levelsEnd + 1, levelsArray.length)) {
+ start = end = to;
+ return true;
+ }
+ return false;
+ }
+
+ public int forwardWhileEqual(final @NotNull MqttTopicLevels levels) {
+ if (!hasNext()) {
+ return levels.getEnd();
+ }
+ int branchIndex = end;
+ int levelsBranchIndex = levels.getEnd();
+ int index = branchIndex + 1;
+ int levelsIndex = levelsBranchIndex + 1;
+ final byte[] levelsArray = levels.getArray();
+ while (true) {
+ final boolean isEnd = index == allEnd;
+ final boolean isLevelsEnd = levelsIndex == levelsArray.length;
+ if (isLevelsEnd || isEnd) {
+ if ((isLevelsEnd || (levelsArray[levelsIndex] == MqttTopicImpl.TOPIC_LEVEL_SEPARATOR)) &&
+ (isEnd || (array[index] == MqttTopicImpl.TOPIC_LEVEL_SEPARATOR))) {
+ branchIndex = index;
+ levelsBranchIndex = levelsIndex;
+ }
+ break;
+ }
+ final byte lb = levelsArray[levelsIndex];
+ if (array[index] == lb) {
+ if (lb == MqttTopicImpl.TOPIC_LEVEL_SEPARATOR) {
+ branchIndex = index;
+ levelsBranchIndex = levelsIndex;
+ }
+ index++;
+ levelsIndex++;
+ } else {
+ break;
+ }
+ }
+ start = end = branchIndex;
+ return levelsBranchIndex;
+ }
+
+ public boolean forwardIfMatch(final @NotNull MqttTopicLevels levels) {
+ if (!hasNext()) {
+ return false;
+ }
+ int index = end + 1;
+ int levelsIndex = levels.getEnd() + 1;
+ final byte[] levelsArray = levels.getArray();
+ while (true) {
+ final boolean isEnd = index == allEnd;
+ final boolean isLevelsEnd = levelsIndex == levelsArray.length;
+ if (isLevelsEnd) {
+ if (isEnd || (array[index] == MqttTopicImpl.TOPIC_LEVEL_SEPARATOR)) {
+ start = end = index;
+ return true;
+ }
+ return false;
+ }
+ if (isEnd) {
+ return false;
+ }
+ final byte lb = levelsArray[levelsIndex];
+ if (array[index] == lb) {
+ index++;
+ levelsIndex++;
+ } else if (lb == MqttTopicFilter.SINGLE_LEVEL_WILDCARD) {
+ while ((index < allEnd) && (array[index] != MqttTopicImpl.TOPIC_LEVEL_SEPARATOR)) {
+ index++;
+ }
+ levelsIndex++;
+ } else {
+ return false;
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevel.java b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevel.java
index 966580653..def52c448 100644
--- a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevel.java
+++ b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevel.java
@@ -18,69 +18,46 @@
package com.hivemq.client.internal.mqtt.datatypes;
import com.hivemq.client.internal.util.ByteArray;
-import com.hivemq.client.internal.util.ByteArrayUtil;
-import com.hivemq.client.mqtt.datatypes.MqttTopic;
import com.hivemq.client.mqtt.datatypes.MqttTopicFilter;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
import java.util.Arrays;
/**
+ * Single topic or topic filter level. May be the single level wildcard but must not be the multi level wildcard (the
+ * multi level wildcard does not represent a topic level).
+ *
* @author Silvio Giebl
*/
-public class MqttTopicLevel extends ByteArray.Range {
-
- public static final @NotNull ByteArray SINGLE_LEVEL_WILDCARD =
- new ByteArray(new byte[]{MqttTopicFilter.SINGLE_LEVEL_WILDCARD});
-
- public static @NotNull MqttTopicLevel root(final @NotNull MqttTopicImpl topic) {
- final byte[] binary = topic.toBinary();
- final int end = nextEnd(binary, 0);
- return new MqttTopicLevel(binary, 0, end);
- }
-
- public static @NotNull MqttTopicLevel root(final @NotNull MqttTopicFilterImpl topicFilter) {
- final byte[] binary = topicFilter.toBinary();
- final int start = topicFilter.getFilterByteStart();
- final int end = nextEnd(binary, start);
- return new MqttTopicLevel(binary, start, end);
- }
+public class MqttTopicLevel extends ByteArray {
- private static int nextEnd(final @NotNull byte[] array, final int start) {
- final int nextSeparator = ByteArrayUtil.indexOf(array, start, (byte) MqttTopic.TOPIC_LEVEL_SEPARATOR);
- return (nextSeparator == -1) ? array.length : nextSeparator;
- }
+ private static final @NotNull MqttTopicLevel SINGLE_LEVEL_WILDCARD =
+ new MqttTopicLevel(new byte[]{MqttTopicFilter.SINGLE_LEVEL_WILDCARD});
- private MqttTopicLevel(final @NotNull byte[] array, final int start, final int end) {
- super(array, start, end);
+ static @NotNull MqttTopicLevel of(final @NotNull byte[] array, final int start, final int end) {
+ if (isSingleLevelWildcard(array, start, end)) {
+ return MqttTopicLevel.SINGLE_LEVEL_WILDCARD;
+ }
+ return new MqttTopicLevel(Arrays.copyOfRange(array, start, end));
}
- public @Nullable MqttTopicLevel next() {
- if (end == array.length) {
- return null;
- }
- start = end + 1;
- end = nextEnd(array, start);
- return this;
+ private static boolean isSingleLevelWildcard(final @NotNull byte[] array, final int start, final int end) {
+ return ((end - start) == 1) && (array[start] == MqttTopicFilter.SINGLE_LEVEL_WILDCARD);
}
- public @NotNull ByteArray copy() {
- if (isSingleLevelWildcard()) {
- return SINGLE_LEVEL_WILDCARD;
- }
- return new ByteArray(Arrays.copyOfRange(array, start, end));
+ MqttTopicLevel(final @NotNull byte[] array) {
+ super(array);
}
- public @NotNull MqttTopicLevel fork() {
- return new MqttTopicLevel(array, start, end);
+ @NotNull byte[] getArray() {
+ return array;
}
public boolean isSingleLevelWildcard() {
- return (length() == 1) && (array[start] == MqttTopicFilter.SINGLE_LEVEL_WILDCARD);
+ return isSingleLevelWildcard(array, getStart(), getEnd());
}
- public boolean isMultiLevelWildcard() {
- return (length() == 1) && (array[start] == MqttTopicFilter.MULTI_LEVEL_WILDCARD);
+ public @NotNull MqttTopicLevel trim() {
+ return this;
}
}
diff --git a/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevels.java b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevels.java
new file mode 100644
index 000000000..586f15a16
--- /dev/null
+++ b/src/main/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevels.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2018 dc-square and the HiveMQ MQTT Client Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.hivemq.client.internal.mqtt.datatypes;
+
+import com.hivemq.client.internal.util.ByteArrayUtil;
+import com.hivemq.client.mqtt.datatypes.MqttTopic;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Arrays;
+
+/**
+ * Multiple (more than 1) topic or topic filter levels. May contain single level wildcard(s) but must not contain the
+ * multi level wildcard (the multi level wildcard does not represent a topic level).
+ *
+ * equals and hashCode match the first level.
+ *
+ * @author Silvio Giebl
+ */
+public class MqttTopicLevels extends MqttTopicLevel {
+
+ public static @NotNull MqttTopicLevels concat(
+ final @NotNull MqttTopicLevel level1, final @NotNull MqttTopicLevel level2) {
+
+ final byte[] array1 = level1.trim().getArray();
+ final byte[] array2 = level2.trim().getArray();
+ final byte[] array = new byte[array1.length + 1 + array2.length];
+ System.arraycopy(array1, 0, array, 0, array1.length);
+ array[array1.length] = MqttTopic.TOPIC_LEVEL_SEPARATOR;
+ System.arraycopy(array2, 0, array, array1.length + 1, array2.length);
+ return new MqttTopicLevels(array, level1.length());
+ }
+
+ private final int firstEnd;
+
+ MqttTopicLevels(final @NotNull byte[] array, final int firstEnd) {
+ super(array);
+ this.firstEnd = firstEnd;
+ }
+
+ @Override
+ protected int getEnd() {
+ return firstEnd;
+ }
+
+ public @NotNull MqttTopicLevel before(final int index) {
+ if (index == array.length) {
+ return this;
+ }
+ assert array[index] == MqttTopic.TOPIC_LEVEL_SEPARATOR;
+ if (index == firstEnd) {
+ return MqttTopicLevel.of(array, 0, firstEnd);
+ }
+ return new MqttTopicLevels(Arrays.copyOfRange(array, 0, index), firstEnd);
+ }
+
+ public @NotNull MqttTopicLevel after(final int index) {
+ assert array[index] == MqttTopic.TOPIC_LEVEL_SEPARATOR;
+ final int start = index + 1;
+ final int end = ByteArrayUtil.indexOf(array, start, (byte) MqttTopic.TOPIC_LEVEL_SEPARATOR);
+ if (end == array.length) {
+ return MqttTopicLevel.of(array, start, array.length);
+ }
+ return new MqttTopicLevels(Arrays.copyOfRange(array, start, array.length), end - start);
+ }
+}
diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlows.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlows.java
index e28996d07..6414c80df 100644
--- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlows.java
+++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlows.java
@@ -19,7 +19,6 @@
import com.hivemq.client.internal.annotations.NotThreadSafe;
import com.hivemq.client.internal.mqtt.datatypes.MqttTopicFilterImpl;
-import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl;
import com.hivemq.client.internal.mqtt.ioc.ClientScope;
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttStatefulSubscribe;
@@ -75,8 +74,9 @@ public void subAck(
final ImmutableList subscriptions = subscribe.stateless().getSubscriptions();
final ImmutableList reasonCodes = subAck.getReasonCodes();
+ final boolean countNotMatching = subscriptions.size() > reasonCodes.size();
for (int i = 0; i < subscriptions.size(); i++) {
- if (reasonCodes.get(i).isError()) {
+ if (countNotMatching || reasonCodes.get(i).isError()) {
remove(subscriptions.get(i).getTopicFilter(), flow);
}
}
@@ -89,9 +89,9 @@ void remove(final @NotNull MqttTopicFilterImpl topicFilter, final @Nullable Mqtt
public void unsubscribe(final @NotNull MqttStatefulUnsubscribe unsubscribe, final @NotNull MqttUnsubAck unsubAck) {
final ImmutableList topicFilters = unsubscribe.stateless().getTopicFilters();
final ImmutableList reasonCodes = unsubAck.getReasonCodes();
- final boolean areAllSuccess = reasonCodes == Mqtt3UnsubAckView.REASON_CODES_ALL_SUCCESS;
+ final boolean allSuccess = reasonCodes == Mqtt3UnsubAckView.REASON_CODES_ALL_SUCCESS;
for (int i = 0; i < topicFilters.size(); i++) {
- if (areAllSuccess || !reasonCodes.get(i).isError()) {
+ if (allSuccess || !reasonCodes.get(i).isError()) {
unsubscribe(topicFilters.get(i));
}
}
@@ -106,17 +106,16 @@ void cancel(final @NotNull MqttSubscribedPublishFlow flow) {
}
@NotNull HandleList findMatching(final @NotNull MqttStatefulPublish publish) {
- final HandleList matchingFlows = new HandleList<>();
+ final MqttMatchingPublishFlows matchingFlows = new MqttMatchingPublishFlows();
findMatching(publish, matchingFlows);
return matchingFlows;
}
void findMatching(
- final @NotNull MqttStatefulPublish publish,
- final @NotNull HandleList matchingFlows) {
+ final @NotNull MqttStatefulPublish publish, final @NotNull MqttMatchingPublishFlows matchingFlows) {
- final MqttTopicImpl topic = publish.stateless().getTopic();
- if (subscriptionFlows.findMatching(topic, matchingFlows) || !matchingFlows.isEmpty()) {
+ subscriptionFlows.findMatching(publish.stateless().getTopic(), matchingFlows);
+ if (matchingFlows.subscriptionFound) {
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.SUBSCRIBED.ordinal()]);
} else {
add(matchingFlows, globalFlows[MqttGlobalPublishFilter.UNSOLICITED.ordinal()]);
diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlowsWithId.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlowsWithId.java
index b3022101c..1df7da040 100644
--- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlowsWithId.java
+++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttIncomingPublishFlowsWithId.java
@@ -23,20 +23,16 @@
import com.hivemq.client.internal.mqtt.message.publish.MqttStatefulPublish;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttStatefulSubscribe;
import com.hivemq.client.internal.mqtt.message.subscribe.suback.MqttSubAck;
-import com.hivemq.client.internal.util.collections.HandleList;
import com.hivemq.client.internal.util.collections.ImmutableIntList;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import javax.inject.Inject;
import java.util.HashMap;
-import java.util.function.Consumer;
import static com.hivemq.client.internal.mqtt.message.subscribe.MqttStatefulSubscribe.DEFAULT_NO_SUBSCRIPTION_IDENTIFIER;
/**
- * single threaded, in channel eventloop
- *
* @author Silvio Giebl
*/
@ClientScope
@@ -45,7 +41,6 @@ public class MqttIncomingPublishFlowsWithId extends MqttIncomingPublishFlows {
private final @NotNull HashMap flowsWithIdsMap = new HashMap<>();
private final @NotNull MqttSubscriptionFlows flowsWithIds;
- private final @NotNull Consumer flowWithIdUnsubscribedCallback = this::unsubscribed;
@Inject
MqttIncomingPublishFlowsWithId(
@@ -62,8 +57,8 @@ public void subscribe(
if (flow != null) {
final int subscriptionIdentifier = subscribe.getSubscriptionIdentifier();
if (subscriptionIdentifier != DEFAULT_NO_SUBSCRIPTION_IDENTIFIER) {
- flowsWithIdsMap.put(subscriptionIdentifier, flow);
flow.setSubscriptionIdentifier(subscriptionIdentifier);
+ flowsWithIdsMap.put(subscriptionIdentifier, flow);
}
}
super.subscribe(subscribe, flow);
@@ -103,15 +98,12 @@ void remove(final @NotNull MqttTopicFilterImpl topicFilter, final @Nullable Mqtt
@Override
void unsubscribe(final @NotNull MqttTopicFilterImpl topicFilter) {
- flowsWithIds.unsubscribe(topicFilter, flowWithIdUnsubscribedCallback);
+ flowsWithIds.unsubscribe(topicFilter, this::unsubscribed);
super.unsubscribe(topicFilter);
}
private void unsubscribed(final @NotNull MqttSubscribedPublishFlow flow) {
- final int subscriptionIdentifier = flow.getSubscriptionIdentifier();
- if (subscriptionIdentifier != DEFAULT_NO_SUBSCRIPTION_IDENTIFIER) {
- flowsWithIdsMap.remove(subscriptionIdentifier);
- }
+ flowsWithIdsMap.remove(flow.getSubscriptionIdentifier());
}
@Override
@@ -127,18 +119,21 @@ void cancel(final @NotNull MqttSubscribedPublishFlow flow) {
@Override
void findMatching(
- final @NotNull MqttStatefulPublish publish,
- final @NotNull HandleList matchingFlows) {
+ final @NotNull MqttStatefulPublish publish, final @NotNull MqttMatchingPublishFlows matchingFlows) {
final ImmutableIntList subscriptionIdentifiers = publish.getSubscriptionIdentifiers();
if (!subscriptionIdentifiers.isEmpty()) {
for (int i = 0; i < subscriptionIdentifiers.size(); i++) {
- final int subscriptionIdentifier = subscriptionIdentifiers.get(i);
- final MqttSubscribedPublishFlow flow = flowsWithIdsMap.get(subscriptionIdentifier);
+ final MqttSubscribedPublishFlow flow = flowsWithIdsMap.get(subscriptionIdentifiers.get(i));
if (flow != null) {
matchingFlows.add(flow);
}
}
+ if (matchingFlows.isEmpty()) {
+ flowsWithIds.findMatching(publish.stateless().getTopic(), matchingFlows);
+ } else {
+ matchingFlows.subscriptionFound = true;
+ }
}
super.findMatching(publish, matchingFlows);
}
diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttMatchingPublishFlows.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttMatchingPublishFlows.java
new file mode 100644
index 000000000..18836af06
--- /dev/null
+++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttMatchingPublishFlows.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 dc-square and the HiveMQ MQTT Client Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.hivemq.client.internal.mqtt.handler.publish.incoming;
+
+import com.hivemq.client.internal.util.collections.HandleList;
+
+/**
+ * @author Silvio Giebl
+ */
+class MqttMatchingPublishFlows extends HandleList {
+
+ boolean subscriptionFound;
+}
diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowList.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowList.java
index 0d783ebce..971a561e2 100644
--- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowList.java
+++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowList.java
@@ -121,8 +121,8 @@ public void cancel(final @NotNull MqttSubscribedPublishFlow flow) {
}
@Override
- public boolean findMatching(
- final @NotNull MqttTopicImpl topic, final @NotNull HandleList matchingFlows) {
+ public void findMatching(
+ final @NotNull MqttTopicImpl topic, final @NotNull MqttMatchingPublishFlows matchingFlows) {
for (final MqttSubscribedPublishFlow flow : flows) {
for (final MqttTopicFilterImpl topicFilter : flow.getTopicFilters()) {
@@ -133,14 +133,15 @@ public boolean findMatching(
}
}
if (!matchingFlows.isEmpty()) {
- return true;
+ matchingFlows.subscriptionFound = true;
+ return;
}
for (final MqttTopicFilterImpl subscribedTopicFilter : subscribedTopicFilters.keySet()) {
if (subscribedTopicFilter.matches(topic)) {
- return true;
+ matchingFlows.subscriptionFound = true;
+ return;
}
}
- return false;
}
@Override
diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTree.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTree.java
index 409340f05..ab4007a37 100644
--- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTree.java
+++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTree.java
@@ -18,10 +18,7 @@
package com.hivemq.client.internal.mqtt.handler.publish.incoming;
import com.hivemq.client.internal.annotations.NotThreadSafe;
-import com.hivemq.client.internal.mqtt.datatypes.MqttTopicFilterImpl;
-import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl;
-import com.hivemq.client.internal.mqtt.datatypes.MqttTopicLevel;
-import com.hivemq.client.internal.util.ByteArray;
+import com.hivemq.client.internal.mqtt.datatypes.*;
import com.hivemq.client.internal.util.collections.HandleList;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -37,8 +34,6 @@
@NotThreadSafe
public class MqttSubscriptionFlowTree implements MqttSubscriptionFlows {
- private static final @NotNull ByteArray ROOT_LEVEL = new ByteArray(new byte[0]);
-
private @Nullable TopicTreeNode rootNode;
@Inject
@@ -48,20 +43,25 @@ public class MqttSubscriptionFlowTree implements MqttSubscriptionFlows {
public void subscribe(
final @NotNull MqttTopicFilterImpl topicFilter, final @Nullable MqttSubscribedPublishFlow flow) {
- final MqttTopicLevel level = MqttTopicLevel.root(topicFilter);
final TopicTreeEntry entry = (flow == null) ? null : new TopicTreeEntry(flow, topicFilter);
- if (rootNode == null) {
- rootNode = new TopicTreeNode(ROOT_LEVEL, level, entry);
- } else {
- rootNode.subscribe(level, entry);
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(topicFilter);
+ TopicTreeNode node = rootNode;
+ if (node == null) {
+ rootNode = node = new TopicTreeNode(null, null);
+ }
+ while (node != null) {
+ node = node.subscribe(topicIterator, entry);
}
}
@Override
public void remove(final @NotNull MqttTopicFilterImpl topicFilter, final @Nullable MqttSubscribedPublishFlow flow) {
- if ((rootNode != null) && rootNode.remove(MqttTopicLevel.root(topicFilter), flow)) {
- rootNode = null;
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(topicFilter);
+ TopicTreeNode node = rootNode;
+ while (node != null) {
+ node = node.remove(topicIterator, flow);
}
+ compact();
}
@Override
@@ -69,31 +69,47 @@ public void unsubscribe(
final @NotNull MqttTopicFilterImpl topicFilter,
final @Nullable Consumer unsubscribedCallback) {
- if ((rootNode != null) && rootNode.unsubscribe(MqttTopicLevel.root(topicFilter), unsubscribedCallback)) {
- rootNode = null;
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(topicFilter);
+ TopicTreeNode node = rootNode;
+ while (node != null) {
+ node = node.unsubscribe(topicIterator, unsubscribedCallback);
}
+ compact();
}
@Override
public void cancel(final @NotNull MqttSubscribedPublishFlow flow) {
- if (rootNode != null) {
- for (final MqttTopicFilterImpl topicFilter : flow.getTopicFilters()) {
- rootNode.cancel(MqttTopicLevel.root(topicFilter), flow);
+ for (final MqttTopicFilterImpl topicFilter : flow.getTopicFilters()) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(topicFilter);
+ TopicTreeNode node = rootNode;
+ while (node != null) {
+ node = node.cancel(topicIterator, flow);
}
}
}
@Override
- public boolean findMatching(
- final @NotNull MqttTopicImpl topic, final @NotNull HandleList matchingFlows) {
+ public void findMatching(
+ final @NotNull MqttTopicImpl topic, final @NotNull MqttMatchingPublishFlows matchingFlows) {
- return (rootNode != null) && rootNode.findMatching(MqttTopicLevel.root(topic), matchingFlows);
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(topic);
+ TopicTreeNode node = rootNode;
+ while (node != null) {
+ node = node.findMatching(topicIterator, matchingFlows);
+ }
}
@Override
public void clear(final @NotNull Throwable cause) {
- if (rootNode != null) {
- rootNode.clear(cause);
+ TopicTreeNode node = rootNode;
+ while (node != null) {
+ node = node.clear(cause);
+ }
+ rootNode = null;
+ }
+
+ private void compact() {
+ if ((rootNode != null) && rootNode.isEmpty()) {
rootNode = null;
}
}
@@ -111,32 +127,46 @@ private static class TopicTreeEntry {
private static class TopicTreeNode {
- private final @NotNull ByteArray parentLevel;
- private @Nullable HashMap next;
+ private @Nullable TopicTreeNode parent;
+ private @Nullable MqttTopicLevel topicLevel;
+ private @Nullable HashMap next;
+ private @Nullable TopicTreeNode singleLevel;
private @Nullable HandleList entries;
private @Nullable HandleList multiLevelEntries;
private int subscriptions;
private int multiLevelSubscriptions;
- private boolean hasSingleLevelSubscription;
- TopicTreeNode(
- final @NotNull ByteArray parentLevel, final @Nullable MqttTopicLevel level,
- final @Nullable TopicTreeEntry entry) {
-
- this.parentLevel = parentLevel;
- subscribe(level, entry);
+ TopicTreeNode(final @Nullable TopicTreeNode parent, final @Nullable MqttTopicLevel topicLevel) {
+ this.parent = parent;
+ this.topicLevel = topicLevel;
}
- void subscribe(final @Nullable MqttTopicLevel level, final @Nullable TopicTreeEntry entry) {
- if (level == null) {
- if (entry != null) {
- if (entries == null) {
- entries = new HandleList<>();
+ @Nullable TopicTreeNode subscribe(
+ final @NotNull MqttTopicIterator topicIterator, final @Nullable TopicTreeEntry entry) {
+
+ if (topicIterator.hasNext()) {
+ final MqttTopicLevel nextLevel = topicIterator.next();
+ if (nextLevel.isSingleLevelWildcard()) {
+ if (singleLevel == null) {
+ return singleLevel = new TopicTreeNode(this, nextLevel.trim());
}
- entries.add(entry);
+ return getNext(singleLevel, topicIterator);
}
- subscriptions++;
- } else if (level.isMultiLevelWildcard()) {
+ TopicTreeNode node;
+ if (next == null) {
+ next = new HashMap<>();
+ node = null;
+ } else {
+ node = next.get(nextLevel);
+ }
+ if (node == null) {
+ node = new TopicTreeNode(this, nextLevel.trim());
+ next.put(node.topicLevel, node);
+ return node;
+ }
+ return getNext(node, topicIterator);
+ }
+ if (topicIterator.hasMultiLevelWildcard()) {
if (entry != null) {
if (multiLevelEntries == null) {
multiLevelEntries = new HandleList<>();
@@ -145,47 +175,36 @@ void subscribe(final @Nullable MqttTopicLevel level, final @Nullable TopicTreeEn
}
multiLevelSubscriptions++;
} else {
- final TopicTreeNode node;
- if (next == null) {
- next = new HashMap<>();
- node = null;
- } else {
- node = next.get(level);
- }
- if (node == null) {
- if (level.isSingleLevelWildcard()) {
- hasSingleLevelSubscription = true;
+ if (entry != null) {
+ if (entries == null) {
+ entries = new HandleList<>();
}
- final ByteArray levelCopy = level.copy();
- next.put(levelCopy, new TopicTreeNode(levelCopy, level.next(), entry));
- } else {
- node.subscribe(level.next(), entry);
+ entries.add(entry);
}
+ subscriptions++;
}
+ return null;
}
- boolean remove(final @Nullable MqttTopicLevel level, final @Nullable MqttSubscribedPublishFlow flow) {
- if (level == null) {
- if (remove(entries, flow)) {
- entries = null;
- }
- subscriptions--;
- return (subscriptions == 0) && (multiLevelSubscriptions == 0) && (next == null);
+ @Nullable TopicTreeNode remove(
+ final @NotNull MqttTopicIterator topicIterator, final @Nullable MqttSubscribedPublishFlow flow) {
+
+ if (topicIterator.hasNext()) {
+ return traverseNext(topicIterator);
}
- if (level.isMultiLevelWildcard()) {
+ if (topicIterator.hasMultiLevelWildcard()) {
if (remove(multiLevelEntries, flow)) {
multiLevelEntries = null;
}
multiLevelSubscriptions--;
- return (subscriptions == 0) && (multiLevelSubscriptions == 0) && (next == null);
- }
- if (next != null) {
- final TopicTreeNode node = next.get(level);
- if ((node != null) && node.remove(level.next(), flow)) {
- return removeNext(node);
+ } else {
+ if (remove(entries, flow)) {
+ entries = null;
}
+ subscriptions--;
}
- return false;
+ compact();
+ return null;
}
private static boolean remove(
@@ -205,29 +224,24 @@ private static boolean remove(
return false;
}
- boolean unsubscribe(
- final @Nullable MqttTopicLevel level,
+ @Nullable TopicTreeNode unsubscribe(
+ final @NotNull MqttTopicIterator topicIterator,
final @Nullable Consumer unsubscribedCallback) {
- if (level == null) {
- unsubscribe(entries, unsubscribedCallback);
- entries = null;
- subscriptions = 0;
- return (multiLevelSubscriptions == 0) && (next == null);
+ if (topicIterator.hasNext()) {
+ return traverseNext(topicIterator);
}
- if (level.isMultiLevelWildcard()) {
+ if (topicIterator.hasMultiLevelWildcard()) {
unsubscribe(multiLevelEntries, unsubscribedCallback);
multiLevelEntries = null;
multiLevelSubscriptions = 0;
- return (subscriptions == 0) && (next == null);
- }
- if (next != null) {
- final TopicTreeNode node = next.get(level);
- if ((node != null) && node.unsubscribe(level.next(), unsubscribedCallback)) {
- return removeNext(node);
- }
+ } else {
+ unsubscribe(entries, unsubscribedCallback);
+ entries = null;
+ subscriptions = 0;
}
- return false;
+ compact();
+ return null;
}
private static void unsubscribe(
@@ -248,34 +262,22 @@ private static void unsubscribe(
}
}
- private boolean removeNext(final @NotNull TopicTreeNode node) {
- assert next != null;
- if (node.parentLevel == MqttTopicLevel.SINGLE_LEVEL_WILDCARD) {
- hasSingleLevelSubscription = false;
- }
- next.remove(node.parentLevel);
- if (next.isEmpty()) {
- next = null;
- return (subscriptions == 0) && (multiLevelSubscriptions == 0);
- }
- return false;
- }
+ @Nullable TopicTreeNode cancel(
+ final @NotNull MqttTopicIterator topicIterator, final @NotNull MqttSubscribedPublishFlow flow) {
- void cancel(final @Nullable MqttTopicLevel level, final @NotNull MqttSubscribedPublishFlow flow) {
- if (level == null) {
- if (cancel(entries, flow)) {
- entries = null;
- }
- } else if (level.isMultiLevelWildcard()) {
+ if (topicIterator.hasNext()) {
+ return traverseNext(topicIterator);
+ }
+ if (topicIterator.hasMultiLevelWildcard()) {
if (cancel(multiLevelEntries, flow)) {
multiLevelEntries = null;
}
- } else if (next != null) {
- final TopicTreeNode node = next.get(level);
- if (node != null) {
- node.cancel(level.next(), flow);
+ } else {
+ if (cancel(entries, flow)) {
+ entries = null;
}
}
+ return null;
}
private static boolean cancel(
@@ -294,28 +296,44 @@ private static boolean cancel(
return false;
}
- boolean findMatching(
- final @Nullable MqttTopicLevel level,
- final @NotNull HandleList matchingFlows) {
+ @Nullable TopicTreeNode findMatching(
+ final @NotNull MqttTopicIterator topicIterator, final @NotNull MqttMatchingPublishFlows matchingFlows) {
- if (level == null) {
- add(matchingFlows, entries);
+ if (topicIterator.hasNext()) {
add(matchingFlows, multiLevelEntries);
- return (subscriptions != 0) || (multiLevelSubscriptions != 0);
- }
- add(matchingFlows, multiLevelEntries);
- boolean subscriptionFound = (multiLevelSubscriptions != 0);
- if (next != null) {
- if (hasSingleLevelSubscription) {
- final TopicTreeNode singleLevelNode = next.get(MqttTopicLevel.SINGLE_LEVEL_WILDCARD);
- subscriptionFound |= singleLevelNode.findMatching(level.fork().next(), matchingFlows);
+ if (multiLevelSubscriptions != 0) {
+ matchingFlows.subscriptionFound = true;
+ }
+ final MqttTopicLevel nextLevel = topicIterator.next();
+ final TopicTreeNode nextNode = (next == null) ? null : next.get(nextLevel);
+ final TopicTreeNode singleLevel = this.singleLevel;
+ if (nextNode == null) {
+ return findNext(singleLevel, topicIterator);
+ }
+ if (singleLevel == null) {
+ return findNext(nextNode, topicIterator);
}
- final TopicTreeNode node = next.get(level);
- if (node != null) {
- subscriptionFound |= node.findMatching(level.next(), matchingFlows);
+ final MqttTopicIterator fork = topicIterator.fork();
+ final TopicTreeNode nextNodeNext = findNext(nextNode, topicIterator);
+ if (nextNodeNext == null) {
+ return findNext(singleLevel, topicIterator);
}
+ final TopicTreeNode singleLevelNext = findNext(singleLevel, fork);
+ if (singleLevelNext == null) {
+ return nextNodeNext;
+ }
+ TopicTreeNode node = singleLevelNext;
+ while (node != null) {
+ node = node.findMatching(fork, matchingFlows);
+ }
+ return nextNodeNext;
+ }
+ add(matchingFlows, entries);
+ add(matchingFlows, multiLevelEntries);
+ if ((subscriptions != 0) || (multiLevelSubscriptions != 0)) {
+ matchingFlows.subscriptionFound = true;
}
- return subscriptionFound;
+ return null;
}
private static void add(
@@ -329,7 +347,13 @@ private static void add(
}
}
- void clear(final @NotNull Throwable cause) {
+ @Nullable TopicTreeNode clear(final @NotNull Throwable cause) {
+ if (next != null) {
+ return next.values().iterator().next();
+ }
+ if (singleLevel != null) {
+ return singleLevel;
+ }
if (entries != null) {
for (final TopicTreeEntry entry : entries) {
entry.flow.onError(cause);
@@ -342,13 +366,135 @@ void clear(final @NotNull Throwable cause) {
}
multiLevelEntries = null;
}
+ if (parent != null) {
+ parent.removeNext(this);
+ }
+ return parent;
+ }
+
+ private @NotNull TopicTreeNode getNext(
+ final @NotNull TopicTreeNode node, final @NotNull MqttTopicIterator topicIterator) {
+
+ final MqttTopicLevel topicLevel = node.topicLevel;
+ if (topicLevel instanceof MqttTopicLevels) {
+ final MqttTopicLevels topicLevels = (MqttTopicLevels) topicLevel;
+ final int branchIndex = topicIterator.forwardWhileEqual(topicLevels);
+ final MqttTopicLevel topicLevelBefore = topicLevels.before(branchIndex);
+ if (topicLevelBefore != topicLevels) {
+ final MqttTopicLevel topicLevelAfter = topicLevels.after(branchIndex);
+ final TopicTreeNode nodeBefore = new TopicTreeNode(this, topicLevelBefore);
+ node.parent = nodeBefore;
+ node.topicLevel = topicLevelAfter;
+ if (topicLevelBefore.isSingleLevelWildcard()) {
+ singleLevel = nodeBefore;
+ } else {
+ assert next != null;
+ next.remove(topicLevels);
+ next.put(topicLevelBefore, nodeBefore);
+ }
+ if (topicLevelAfter.isSingleLevelWildcard()) {
+ nodeBefore.singleLevel = node;
+ } else {
+ nodeBefore.next = new HashMap<>();
+ nodeBefore.next.put(topicLevelAfter, node);
+ }
+ return nodeBefore;
+ }
+ }
+ return node;
+ }
+
+ private @Nullable TopicTreeNode traverseNext(final @NotNull MqttTopicIterator topicIterator) {
+ final MqttTopicLevel nextLevel = topicIterator.next();
+ if (nextLevel.isSingleLevelWildcard()) {
+ return traverseNext(singleLevel, topicIterator);
+ }
if (next != null) {
- next.values().forEach(node -> node.clear(cause));
- next = null;
+ return traverseNext(next.get(nextLevel), topicIterator);
+ }
+ return null;
+ }
+
+ private static @Nullable TopicTreeNode traverseNext(
+ final @Nullable TopicTreeNode node, final @NotNull MqttTopicIterator topicIterator) {
+
+ if (node == null) {
+ return null;
+ }
+ final MqttTopicLevel topicLevel = node.topicLevel;
+ if (topicLevel instanceof MqttTopicLevels) {
+ if (topicIterator.forwardIfEqual((MqttTopicLevels) topicLevel)) {
+ return node;
+ }
+ return null;
}
- subscriptions = 0;
- multiLevelSubscriptions = 0;
- hasSingleLevelSubscription = false;
+ return node;
+ }
+
+ private static @Nullable TopicTreeNode findNext(
+ final @Nullable TopicTreeNode node, final @NotNull MqttTopicIterator topicIterator) {
+
+ if (node == null) {
+ return null;
+ }
+ final MqttTopicLevel topicLevel = node.topicLevel;
+ if (topicLevel instanceof MqttTopicLevels) {
+ if (topicIterator.forwardIfMatch((MqttTopicLevels) topicLevel)) {
+ return node;
+ }
+ return null;
+ }
+ return node;
+ }
+
+ private void compact() {
+ if ((parent != null) && ((subscriptions + multiLevelSubscriptions) == 0)) {
+ final boolean hasSingleLevel = singleLevel != null;
+ final boolean hasNext = next != null;
+ if (!hasSingleLevel && !hasNext) {
+ parent.removeNext(this);
+ parent.compact();
+ } else if (hasSingleLevel && !hasNext) {
+ fuse(singleLevel);
+ } else if (!hasSingleLevel && next.size() == 1) {
+ fuse(next.values().iterator().next());
+ }
+ }
+ }
+
+ private void fuse(final @NotNull TopicTreeNode child) {
+ assert parent != null;
+ assert topicLevel != null;
+ assert child.parent == this;
+ assert child.topicLevel != null;
+ final TopicTreeNode parent = this.parent;
+ final MqttTopicLevels fusedTopicLevel = MqttTopicLevels.concat(topicLevel, child.topicLevel);
+ child.parent = parent;
+ child.topicLevel = fusedTopicLevel;
+ if (fusedTopicLevel.isSingleLevelWildcard()) {
+ parent.singleLevel = child;
+ } else {
+ assert parent.next != null;
+ parent.next.remove(topicLevel);
+ parent.next.put(fusedTopicLevel, child);
+ }
+ }
+
+ private void removeNext(final @NotNull TopicTreeNode node) {
+ assert node.topicLevel != null;
+ if (node.topicLevel.isSingleLevelWildcard()) {
+ singleLevel = null;
+ } else {
+ assert next != null;
+ next.remove(node.topicLevel);
+ if (next.isEmpty()) {
+ next = null;
+ }
+ }
+ }
+
+ boolean isEmpty() {
+ return ((subscriptions + multiLevelSubscriptions) == 0) && (singleLevel == null) && (next == null);
}
}
}
diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlows.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlows.java
index 9da81b9aa..a6a17bd1b 100644
--- a/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlows.java
+++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlows.java
@@ -20,7 +20,6 @@
import com.hivemq.client.internal.annotations.NotThreadSafe;
import com.hivemq.client.internal.mqtt.datatypes.MqttTopicFilterImpl;
import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl;
-import com.hivemq.client.internal.util.collections.HandleList;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -42,7 +41,7 @@ void unsubscribe(
void cancel(@NotNull MqttSubscribedPublishFlow flow);
- boolean findMatching(@NotNull MqttTopicImpl topic, @NotNull HandleList matchingFlows);
+ void findMatching(@NotNull MqttTopicImpl topic, @NotNull MqttMatchingPublishFlows matchingFlows);
void clear(@NotNull Throwable cause);
}
diff --git a/src/main/java/com/hivemq/client/internal/util/ByteArray.java b/src/main/java/com/hivemq/client/internal/util/ByteArray.java
index 1cce32448..0e33c8ff8 100644
--- a/src/main/java/com/hivemq/client/internal/util/ByteArray.java
+++ b/src/main/java/com/hivemq/client/internal/util/ByteArray.java
@@ -35,8 +35,16 @@ public int length() {
return getEnd() - getStart();
}
+ protected int getStart() {
+ return 0;
+ }
+
+ protected int getEnd() {
+ return array.length;
+ }
+
@Override
- public boolean equals(final @Nullable Object o) {
+ public final boolean equals(final @Nullable Object o) {
if (this == o) {
return true;
}
@@ -48,37 +56,7 @@ public boolean equals(final @Nullable Object o) {
}
@Override
- public int hashCode() {
+ public final int hashCode() {
return ByteArrayUtil.hashCode(array, getStart(), getEnd());
}
-
- protected int getStart() {
- return 0;
- }
-
- protected int getEnd() {
- return array.length;
- }
-
- public static class Range extends ByteArray {
-
- protected int start;
- protected int end;
-
- public Range(final @NotNull byte[] array, final int start, final int end) {
- super(array);
- this.start = start;
- this.end = end;
- }
-
- @Override
- protected int getStart() {
- return start;
- }
-
- @Override
- protected int getEnd() {
- return end;
- }
- }
}
diff --git a/src/main/java/com/hivemq/client/internal/util/ByteArrayUtil.java b/src/main/java/com/hivemq/client/internal/util/ByteArrayUtil.java
index a2791d7a0..909bab8e8 100644
--- a/src/main/java/com/hivemq/client/internal/util/ByteArrayUtil.java
+++ b/src/main/java/com/hivemq/client/internal/util/ByteArrayUtil.java
@@ -28,16 +28,16 @@ public static boolean equals(
final @NotNull byte[] array1, final int start1, final int end1, final @NotNull byte[] array2,
final int start2, final int end2) {
- if (array1 == array2) {
- return true;
- }
-
final int length1 = end1 - start1;
final int length2 = end2 - start2;
if (length1 != length2) {
return false;
}
+ if ((array1 == array2) && (start1 == start2)) {
+ return true;
+ }
+
for (int i1 = start1, i2 = start2; i1 < end1; i1++, i2++) {
if (array1[i1] != array2[i2]) {
return false;
@@ -60,7 +60,7 @@ public static int indexOf(final @NotNull byte[] array, final int start, final by
return i;
}
}
- return -1;
+ return array.length;
}
private ByteArrayUtil() {}
diff --git a/src/main/java/com/hivemq/client/internal/util/Ranges.java b/src/main/java/com/hivemq/client/internal/util/Ranges.java
index 25436b756..8c5ebed67 100644
--- a/src/main/java/com/hivemq/client/internal/util/Ranges.java
+++ b/src/main/java/com/hivemq/client/internal/util/Ranges.java
@@ -34,11 +34,59 @@ public Ranges(final int minId, final int maxId) {
}
public int getId() {
- return rootRange.getId();
+ if (rootRange.start == rootRange.end) {
+ return -1;
+ }
+ final int id = rootRange.start;
+ rootRange.start++;
+ if ((rootRange.start == rootRange.end) && (rootRange.next != null)) {
+ rootRange = rootRange.next;
+ }
+ return id;
}
public void returnId(final int id) {
- rootRange = rootRange.returnId(id);
+ Range current = rootRange;
+ if (id < current.start - 1) {
+ rootRange = new Range(id, id + 1, current);
+ return;
+ }
+ Range prev = current;
+ current = returnId(current, id);
+ while (current != null) {
+ if (id < current.start - 1) {
+ prev.next = new Range(id, id + 1, current);
+ return;
+ }
+ prev = current;
+ current = returnId(current, id);
+ }
+ }
+
+ private @Nullable Range returnId(final @NotNull Range range, final int id) {
+ final Range next = range.next;
+ if (id == range.start - 1) {
+ range.start = id;
+ return null;
+ }
+ if (id < range.end) {
+ throw new IllegalStateException("The id was already returned. This must not happen and is a bug.");
+ }
+ if (id == range.end) {
+ if (next == null) {
+ throw new IllegalStateException("The id is greater than maxId. This must not happen and is a bug.");
+ }
+ range.end++;
+ if (range.end == next.start) {
+ range.end = next.end;
+ range.next = next.next;
+ }
+ return null;
+ }
+ if (next == null) {
+ throw new IllegalStateException("The id is greater than maxId. This must not happen and is a bug.");
+ }
+ return next;
}
public int resize(final int maxId) {
@@ -82,44 +130,5 @@ private static class Range {
this.end = end;
this.next = next;
}
-
- int getId() {
- if (start == end) {
- return -1;
- }
- final int id = this.start;
- start++;
- if ((start == end) && (next != null)) {
- start = next.start;
- end = next.end;
- next = next.next;
- }
- return id;
- }
-
- @NotNull Range returnId(final int id) {
- Range range = this;
- if (id < start - 1) {
- range = new Range(id, id + 1, this);
- } else if (id == start - 1) {
- start--;
- } else if (id < end) {
- throw new IllegalStateException("The id was already returned. This must not happen and is a bug.");
- } else if (id == end) {
- if (next == null) {
- throw new IllegalStateException("The id is greater than maxId. This must not happen and is a bug.");
- }
- end++;
- if (end == next.start) {
- end = next.end;
- next = next.next;
- }
- } else if (next != null) {
- next = next.returnId(id);
- } else {
- throw new IllegalStateException("The id is greater than maxId. This must not happen and is a bug.");
- }
- return range;
- }
}
}
diff --git a/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicIteratorTest.java b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicIteratorTest.java
new file mode 100644
index 000000000..489bd5082
--- /dev/null
+++ b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicIteratorTest.java
@@ -0,0 +1,348 @@
+/*
+ * Copyright 2018 dc-square and the HiveMQ MQTT Client Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.hivemq.client.internal.mqtt.datatypes;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.NoSuchElementException;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * @author Silvio Giebl
+ */
+class MqttTopicIteratorTest {
+
+ @Test
+ void of_topic() {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicImpl.of("test/topic"));
+
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel level1 = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("test".getBytes(), 0, 4), level1);
+ assertFalse(level1.isSingleLevelWildcard());
+
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel level2 = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("topic".getBytes(), 0, 5), level2);
+ assertFalse(level2.isSingleLevelWildcard());
+
+ assertFalse(topicIterator.hasNext());
+ assertFalse(topicIterator.hasMultiLevelWildcard());
+ assertThrows(NoSuchElementException.class, topicIterator::next);
+ }
+
+ @Test
+ void of_topicFilter() {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of("test/+/topic/#"));
+
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel level1 = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("test".getBytes(), 0, 4), level1);
+ assertFalse(level1.isSingleLevelWildcard());
+
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel level2 = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("+".getBytes(), 0, 1), level2);
+ assertTrue(level2.isSingleLevelWildcard());
+
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel level3 = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("topic".getBytes(), 0, 5), level3);
+ assertFalse(level3.isSingleLevelWildcard());
+
+ assertFalse(topicIterator.hasNext());
+ assertTrue(topicIterator.hasMultiLevelWildcard());
+ assertThrows(NoSuchElementException.class, topicIterator::next);
+ }
+
+ @Test
+ void fork() {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of("test/+/topic/#"));
+ topicIterator.next();
+
+ final MqttTopicIterator fork = topicIterator.fork();
+
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel level2 = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("+".getBytes(), 0, 1), level2);
+ assertTrue(level2.isSingleLevelWildcard());
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel level3 = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("topic".getBytes(), 0, 5), level3);
+ assertFalse(level3.isSingleLevelWildcard());
+ assertFalse(topicIterator.hasNext());
+ assertTrue(topicIterator.hasMultiLevelWildcard());
+ assertThrows(NoSuchElementException.class, topicIterator::next);
+
+ assertTrue(fork.hasNext());
+ final MqttTopicLevel forkLevel2 = fork.next();
+ assertEquals(MqttTopicLevel.of("+".getBytes(), 0, 1), forkLevel2);
+ assertTrue(forkLevel2.isSingleLevelWildcard());
+ assertTrue(fork.hasNext());
+ final MqttTopicLevel forkLevel3 = fork.next();
+ assertEquals(MqttTopicLevel.of("topic".getBytes(), 0, 5), forkLevel3);
+ assertFalse(forkLevel3.isSingleLevelWildcard());
+ assertFalse(fork.hasNext());
+ assertTrue(fork.hasMultiLevelWildcard());
+ assertThrows(NoSuchElementException.class, fork::next);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"test/topic, topic", "test/+, +", "test/+/#, +"})
+ void trim_topicLevel(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+
+ final MqttTopicLevel trim = topicIterator.next().trim();
+ final MqttTopicLevel level = MqttTopicLevel.of(topicLevels.getBytes(), 0, topicLevels.getBytes().length);
+ MqttTopicLevelsTest.assertEqualsWithClass(level, trim);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"test/+/topic, +/topic", "test/+/topic/#, +/topic"})
+ void trim_topicLevels(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+
+ final MqttTopicLevel trim = topicIterator.next().trim();
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ MqttTopicLevelsTest.assertEqualsWithClass(levels, trim);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"test/topic/filter, test/topic/filter", "test/+/topic, test/+/topic", "test/+/topic/#, test/+/topic"})
+ void forwardIfEqual_equalToEnd(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertTrue(topicIterator.forwardIfEqual(levels));
+
+ assertFalse(topicIterator.hasNext());
+ assertEquals(topicFilter.endsWith("#"), topicIterator.hasMultiLevelWildcard());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "test/topic/filter/abc, test/topic/filter", "test/+/topic/abc, test/+/topic",
+ "test/+/topic/abc/#, test/+/topic"
+ })
+ void forwardIfEqual_remainingLevels(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertTrue(topicIterator.forwardIfEqual(levels));
+
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel lastLevel = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("abc".getBytes(), 0, 3), lastLevel);
+ assertFalse(lastLevel.isSingleLevelWildcard());
+ assertFalse(topicIterator.hasNext());
+ assertEquals(topicFilter.endsWith("#"), topicIterator.hasMultiLevelWildcard());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "test/topic/filter, test/topic/filter/abc", "test/+/topic, test/+/topic/abc",
+ "test/+/topic/#, test/+/topic/abc"
+ })
+ void forwardIfEqual_tooShort(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+ final int start = topicIterator.getStart();
+ final int end = topicIterator.getEnd();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertFalse(topicIterator.forwardIfEqual(levels));
+
+ assertEquals(start, topicIterator.getStart());
+ assertEquals(end, topicIterator.getEnd());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "test, test/topic", "test/topic/filter, test/topic/filte2", "test/topic/filter, test/topic/filter2",
+ "test/topic/filter2, test/topic/filter", "test/+/topic, test/+/topi2", "test/+/topic, test/+/topic2",
+ "test/+/topic2, test/+/topic", "test/+/topic/#, test/+/topi2", "test/+/topic/#, test/+/topic2",
+ "test/+/topic2/#, test/+/topic", "test/topic/+, test/topic/abc"
+ })
+ void forwardIfEqual_notFullyEqual(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+ final int start = topicIterator.getStart();
+ final int end = topicIterator.getEnd();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertFalse(topicIterator.forwardIfEqual(levels));
+
+ assertEquals(start, topicIterator.getStart());
+ assertEquals(end, topicIterator.getEnd());
+ }
+
+ @ParameterizedTest
+ @CsvSource({"test/topic/filter, test/topic/filter", "test/+/topic, test/+/topic", "test/+/topic/#, test/+/topic"})
+ void forwardWhileEqual_equalToEnd(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertEquals(topicLevels.length(), topicIterator.forwardWhileEqual(levels));
+
+ assertFalse(topicIterator.hasNext());
+ assertEquals(topicFilter.endsWith("#"), topicIterator.hasMultiLevelWildcard());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "test/topic/filter/abc, test/topic/filter", "test/+/topic/abc, test/+/topic",
+ "test/+/topic/abc/#, test/+/topic"
+ })
+ void forwardWhileEqual_remainingLevels(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertEquals(topicLevels.length(), topicIterator.forwardWhileEqual(levels));
+
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel lastLevel = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("abc".getBytes(), 0, 3), lastLevel);
+ assertFalse(lastLevel.isSingleLevelWildcard());
+ assertFalse(topicIterator.hasNext());
+ assertEquals(topicFilter.endsWith("#"), topicIterator.hasMultiLevelWildcard());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "test, test/topic", "test/topic/filter, test/topic/filter/abc", "test/+/topic, test/+/topic/abc",
+ "test/+/topic/#, test/+/topic/abc"
+ })
+ void forwardWhileEqual_tooShort(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+
+ final boolean hasMultiLevelWildcard = topicFilter.endsWith("#");
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertEquals(
+ hasMultiLevelWildcard ? topicFilter.length() - 2 : topicFilter.length(),
+ topicIterator.forwardWhileEqual(levels));
+
+ assertFalse(topicIterator.hasNext());
+ assertEquals(hasMultiLevelWildcard, topicIterator.hasMultiLevelWildcard());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "test/topic/filter, test/topic/filte2, 10", "test/topic/filter, test/topic/filter2, 10",
+ "test/topic/filter2, test/topic/filter, 10", "test/+/topic, test/+/topi2, 6",
+ "test/+/topic, test/+/topic2, 6", "test/+/topic2, test/+/topic, 6", "test/+/topic/#, test/+/topi2, 6",
+ "test/+/topic/#, test/+/topic2, 6", "test/+/topic2/#, test/+/topic, 6", "test/topic/+, test/topic/abc, 10"
+ })
+ void forwardWhileEqual_notFullyEqual(
+ final @NotNull String topicFilter, final @NotNull String topicLevels, final int branchIndex) {
+
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertEquals(branchIndex, topicIterator.forwardWhileEqual(levels));
+
+ assertEquals(branchIndex, topicIterator.getStart());
+ assertEquals(branchIndex, topicIterator.getEnd());
+ }
+
+ @ParameterizedTest
+ @CsvSource(
+ {"test/topic/filter, test/topic/filter", "test/topic/filter, test/+/filter", "test/topic/filter, test/+/+"})
+ void forwardIfMatch_equalToEnd(final @NotNull String topic, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicImpl.of(topic));
+ topicIterator.next();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertTrue(topicIterator.forwardIfMatch(levels));
+
+ assertFalse(topicIterator.hasNext());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "test/topic/filter/abc, test/topic/filter", "test/topic/filter/abc, test/+/filter",
+ "test/topic/filter/abc, test/+/+"
+ })
+ void forwardIfMatch_remainingLevels(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertTrue(topicIterator.forwardIfMatch(levels));
+
+ assertTrue(topicIterator.hasNext());
+ final MqttTopicLevel lastLevel = topicIterator.next();
+ assertEquals(MqttTopicLevel.of("abc".getBytes(), 0, 3), lastLevel);
+ assertFalse(lastLevel.isSingleLevelWildcard());
+ assertFalse(topicIterator.hasNext());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "test/topic/filter, test/topic/filter/abc", "test/topic/filter, test/+/filter/abc",
+ "test/topic/filter, test/+/+/abc", "test/topic/filter, test/+/filter/+"
+ })
+ void forwardIfMatch_tooShort(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+ final int start = topicIterator.getStart();
+ final int end = topicIterator.getEnd();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertFalse(topicIterator.forwardIfMatch(levels));
+
+ assertEquals(start, topicIterator.getStart());
+ assertEquals(end, topicIterator.getEnd());
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "test, test/topic", "test/topic/filter, test/topic/filte2", "test/topic/filter, test/topic/filter2",
+ "test/topic/filter2, test/topic/filter", "test/topic/filter, test/+/filte2",
+ "test/topic/filter, test/+/filter2", "test/topic/filter2, test/+/filter"
+ })
+ void forwardIfMatch_notFullyEqual(final @NotNull String topicFilter, final @NotNull String topicLevels) {
+ final MqttTopicIterator topicIterator = MqttTopicIterator.of(MqttTopicFilterImpl.of(topicFilter));
+ topicIterator.next();
+ final int start = topicIterator.getStart();
+ final int end = topicIterator.getEnd();
+
+ final MqttTopicLevels levels = createTopicLevels(topicLevels);
+ assertFalse(topicIterator.forwardIfMatch(levels));
+
+ assertEquals(start, topicIterator.getStart());
+ assertEquals(end, topicIterator.getEnd());
+ }
+
+ private static @NotNull MqttTopicLevels createTopicLevels(final @NotNull String levels) {
+ final byte[] bytes = levels.getBytes();
+ final int firstEnd = levels.indexOf('/');
+ return new MqttTopicLevels(bytes, (firstEnd == -1) ? bytes.length : firstEnd);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevelTest.java b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevelTest.java
new file mode 100644
index 000000000..570fef126
--- /dev/null
+++ b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevelTest.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2018 dc-square and the HiveMQ MQTT Client Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.hivemq.client.internal.mqtt.datatypes;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * @author Silvio Giebl
+ */
+class MqttTopicLevelTest {
+
+ @Test
+ void of_singleLevelWildcardIsSame() {
+ final MqttTopicLevel topicLevel1 = MqttTopicLevel.of("+".getBytes(), 0, 1);
+ final MqttTopicLevel topicLevel2 = MqttTopicLevel.of("+".getBytes(), 0, 1);
+ final MqttTopicLevel topicLevel3 = MqttTopicLevel.of("+/abc".getBytes(), 0, 1);
+ assertSame(topicLevel1, topicLevel2);
+ assertSame(topicLevel1, topicLevel3);
+ assertEquals(1, topicLevel1.getArray().length);
+ assertEquals(1, topicLevel2.getArray().length);
+ assertEquals(1, topicLevel3.getArray().length);
+ }
+
+ @Test
+ void isSingleLevelWildcard() {
+ final MqttTopicLevel topicLevel1 = MqttTopicLevel.of("+".getBytes(), 0, 1);
+ final MqttTopicLevel topicLevel2 = MqttTopicLevel.of("+/abc".getBytes(), 0, 1);
+ final MqttTopicLevel topicLevel3 = MqttTopicLevel.of("+/abc/def".getBytes(), 0, 1);
+ assertTrue(topicLevel1.isSingleLevelWildcard());
+ assertTrue(topicLevel2.isSingleLevelWildcard());
+ assertTrue(topicLevel3.isSingleLevelWildcard());
+ assertEquals(1, topicLevel1.getArray().length);
+ assertEquals(1, topicLevel2.getArray().length);
+ assertEquals(1, topicLevel3.getArray().length);
+ final MqttTopicLevel topicLevel4 = MqttTopicLevel.of("abc/+".getBytes(), 0, 3);
+ final MqttTopicLevel topicLevel5 = MqttTopicLevel.of("abc/+/def".getBytes(), 0, 3);
+ assertFalse(topicLevel4.isSingleLevelWildcard());
+ assertFalse(topicLevel5.isSingleLevelWildcard());
+ assertEquals(3, topicLevel4.getArray().length);
+ assertEquals(3, topicLevel5.getArray().length);
+ }
+
+ @Test
+ void trim_isSame() {
+ final MqttTopicLevel topicLevel1 = MqttTopicLevel.of("+".getBytes(), 0, 1);
+ final MqttTopicLevel topicLevel2 = MqttTopicLevel.of("abc".getBytes(), 0, 3);
+ final MqttTopicLevel topicLevel3 = MqttTopicLevel.of("abc/def".getBytes(), 0, 3);
+ assertSame(topicLevel1, topicLevel1.trim());
+ assertSame(topicLevel2, topicLevel2.trim());
+ assertSame(topicLevel3, topicLevel3.trim());
+ }
+
+ @Test
+ void equals() {
+ EqualsVerifier.forClass(MqttTopicLevel.class).verify();
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevelsTest.java b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevelsTest.java
new file mode 100644
index 000000000..ee9ff6706
--- /dev/null
+++ b/src/test/java/com/hivemq/client/internal/mqtt/datatypes/MqttTopicLevelsTest.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2018 dc-square and the HiveMQ MQTT Client Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.hivemq.client.internal.mqtt.datatypes;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * @author Silvio Giebl
+ */
+class MqttTopicLevelsTest {
+
+ @Test
+ void concat_topicLevel() {
+ final MqttTopicLevel level1 = MqttTopicLevel.of("test".getBytes(), 0, 4);
+ final MqttTopicLevel level2 = MqttTopicLevel.of("topic".getBytes(), 0, 5);
+ final MqttTopicLevels concat = MqttTopicLevels.concat(level1, level2);
+ assertEquals(level1, concat);
+ assertEquals(4, concat.getEnd());
+ assertEquals(4 + 1 + 5, concat.getArray().length);
+ assertEqualsWithClass(level1, concat.before(4));
+ assertEqualsWithClass(level2, concat.after(4));
+ }
+
+ @Test
+ void concat_topicLevelAndTopicLevels() {
+ final MqttTopicLevel level1 = MqttTopicLevel.of("test".getBytes(), 0, 4);
+ final MqttTopicLevel level2 = MqttTopicLevel.of("topic".getBytes(), 0, 5);
+ final MqttTopicLevel level3 = MqttTopicLevel.of("filter".getBytes(), 0, 6);
+ final MqttTopicLevels levels1 = MqttTopicLevels.concat(level1, level2);
+ final MqttTopicLevels levels2 = MqttTopicLevels.concat(level2, level3);
+ final MqttTopicLevels concat1 = MqttTopicLevels.concat(levels1, level3);
+ final MqttTopicLevels concat2 = MqttTopicLevels.concat(level1, levels2);
+ assertEquals(level1, concat1);
+ assertEquals(level1, concat2);
+ assertEquals(4, concat1.getEnd());
+ assertEquals(4, concat2.getEnd());
+ assertEquals(4 + 1 + 5 + 1 + 6, concat1.getArray().length);
+ assertEquals(4 + 1 + 5 + 1 + 6, concat2.getArray().length);
+ assertEqualsWithClass(level1, concat1.before(4));
+ assertEqualsWithClass(level1, concat2.before(4));
+ assertEqualsWithClass(levels1, concat1.before(4 + 1 + 5));
+ assertEqualsWithClass(levels1, concat2.before(4 + 1 + 5));
+ assertEqualsWithClass(levels2, concat1.after(4));
+ assertEqualsWithClass(levels2, concat2.after(4));
+ assertEqualsWithClass(level3, concat1.after(4 + 1 + 5));
+ assertEqualsWithClass(level3, concat2.after(4 + 1 + 5));
+ }
+
+ @Test
+ void concat_TopicLevels() {
+ final MqttTopicLevel level1 = MqttTopicLevel.of("test".getBytes(), 0, 4);
+ final MqttTopicLevel level2 = MqttTopicLevel.of("topic".getBytes(), 0, 5);
+ final MqttTopicLevel level3 = MqttTopicLevel.of("filter".getBytes(), 0, 6);
+ final MqttTopicLevel level4 = MqttTopicLevel.of("abc".getBytes(), 0, 3);
+ final MqttTopicLevels levels1 = MqttTopicLevels.concat(level1, level2);
+ final MqttTopicLevels levels2 = MqttTopicLevels.concat(level3, level4);
+ final MqttTopicLevels levels3 = MqttTopicLevels.concat(levels1, level3);
+ final MqttTopicLevels levels4 = MqttTopicLevels.concat(level2, levels2);
+ final MqttTopicLevels concat = MqttTopicLevels.concat(levels1, levels2);
+ assertEquals(level1, concat);
+ assertEquals(4, concat.getEnd());
+ assertEquals(4 + 1 + 5 + 1 + 6 + 1 + 3, concat.getArray().length);
+ assertEqualsWithClass(level1, concat.before(4));
+ assertEqualsWithClass(levels1, concat.before(4 + 1 + 5));
+ assertEqualsWithClass(levels3, concat.before(4 + 1 + 5 + 1 + 6));
+ assertEqualsWithClass(levels4, concat.after(4));
+ assertEqualsWithClass(levels2, concat.after(4 + 1 + 5));
+ assertEqualsWithClass(level4, concat.after(4 + 1 + 5 + 1 + 6));
+ }
+
+ @Test
+ void before_allIsSame() {
+ final MqttTopicLevel level1 = MqttTopicLevel.of("test".getBytes(), 0, 4);
+ final MqttTopicLevel level2 = MqttTopicLevel.of("topic".getBytes(), 0, 5);
+ final MqttTopicLevel level3 = MqttTopicLevel.of("filter".getBytes(), 0, 6);
+ final MqttTopicLevels levels1 = MqttTopicLevels.concat(level1, level2);
+ final MqttTopicLevels concat = MqttTopicLevels.concat(levels1, level3);
+ assertSame(concat, concat.before(4 + 1 + 5 + 1 + 6));
+ }
+
+ static void assertEqualsWithClass(final @NotNull MqttTopicLevel level1, final @NotNull MqttTopicLevel level2) {
+ assertEquals(level1, level2);
+ assertSame(level1.getClass(), level2.getClass());
+ assertArrayEquals(level1.trim().getArray(), level2.trim().getArray());
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTreeTest.java b/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTreeTest.java
index 816a1c94f..a5fb107a8 100644
--- a/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTreeTest.java
+++ b/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowTreeTest.java
@@ -17,6 +17,14 @@
package com.hivemq.client.internal.mqtt.handler.publish.incoming;
+import com.hivemq.client.internal.mqtt.datatypes.MqttTopicFilterImpl;
+import com.hivemq.client.internal.mqtt.datatypes.MqttTopicImpl;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import static org.junit.jupiter.api.Assertions.*;
+
/**
* @author Silvio Giebl
*/
@@ -26,4 +34,105 @@ class MqttSubscriptionFlowTreeTest extends MqttSubscriptionFlowsTest {
super(MqttSubscriptionFlowTree::new);
}
+ @ParameterizedTest
+ @CsvSource({
+ // split single level before and after
+ "unsubscribe, test/topic1, test/topic2, test/topic3, test/topic1, test/topic2, test/topic3",
+ // split single level before, single level wildcard after
+ "unsubscribe, test/+, test/topic2, test/topic3, test/topic1, test/topic2, test/topic3",
+ // split single level before, single level wildcard after, fuse different order
+ "unsubscribe, test/topic1, test/topic2, test/+, test/topic1, test/topic2, test/topic3",
+ // split multiple levels before, single level after
+ "unsubscribe, test/topic/filter1, test/topic/filter2, test/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3",
+ // split multiple levels before, single level wildcard after
+ "unsubscribe, test/topic/+, test/topic/filter2, test/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3",
+ // split multiple levels before, single level wildcard after, fuse different order
+ "unsubscribe, test/topic/filter1, test/topic/filter2, test/topic/+, test/topic/filter1, test/topic/filter2, test/topic/filter3",
+ // split single level before, multiple levels after
+ "unsubscribe, test/topic1/filter, test/topic2/filter, test/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter",
+ // split single level before, single level wildcard with multiple levels after
+ "unsubscribe, test/+/filter, test/topic2/filter, test/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter",
+ // split single level before, single level wildcard with multiple levels after, fuse different order
+ "unsubscribe, test/topic1/filter, test/topic2/filter, test/+/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter",
+ // split multiple levels before, multiple levels after
+ "unsubscribe, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc",
+ // split multi levels before, single level wildcard with multiple levels after
+ "unsubscribe, test/topic/+/abc, test/topic/filter2/abc, test/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc",
+ // split multi levels before, single level wildcard with multiple levels after, fuse different order
+ "unsubscribe, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/+/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc",
+ // split single level wildcard before, single level after
+ "unsubscribe, +/topic1, +/topic2, +/topic3, test/topic1, test/topic2, test/topic3",
+ // split single level wildcard before, multiple levels after
+ "unsubscribe, +/topic1/filter, +/topic2/filter, +/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter",
+ // split single level wildcard with multiple levels before, single level after
+ "unsubscribe, +/topic/filter1, +/topic/filter2, +/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3",
+ // split single level wildcard with multiple levels before, multiple levels after
+ "unsubscribe, +/topic/filter1/abc, +/topic/filter2/abc, +/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc",
+ // split linear chain
+ "unsubscribe, test/topic/filter, test/topic//filter, test/topic///filter, test/topic/filter, test/topic//filter, test/topic///filter",
+ // split linear chain, do not fuse
+ "unsubscribe, test/topic/filter, test/topic//filter, test/topic///filter, test/topic///filter, test/topic//filter, test/topic/filter",
+ "remove, test/topic1, test/topic2, test/topic3, test/topic1, test/topic2, test/topic3",
+ "remove, test/+, test/topic2, test/topic3, test/topic1, test/topic2, test/topic3",
+ "remove, test/topic1, test/topic2, test/+, test/topic1, test/topic2, test/topic3",
+ "remove, test/topic/filter1, test/topic/filter2, test/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3",
+ "remove, test/topic/+, test/topic/filter2, test/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3",
+ "remove, test/topic/filter1, test/topic/filter2, test/topic/+, test/topic/filter1, test/topic/filter2, test/topic/filter3",
+ "remove, test/topic1/filter, test/topic2/filter, test/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter",
+ "remove, test/+/filter, test/topic2/filter, test/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter",
+ "remove, test/topic1/filter, test/topic2/filter, test/+/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter",
+ "remove, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc",
+ "remove, test/topic/+/abc, test/topic/filter2/abc, test/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc",
+ "remove, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/+/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc",
+ "remove, +/filter1, +/filter2, +/filter3, topic/filter1, topic/filter2, topic/filter3",
+ "remove, +/topic1/filter, +/topic2/filter, +/topic3/filter, test/topic1/filter, test/topic2/filter, test/topic3/filter",
+ "remove, +/topic/filter1, +/topic/filter2, +/topic/filter3, test/topic/filter1, test/topic/filter2, test/topic/filter3",
+ "remove, +/topic/filter1/abc, +/topic/filter2/abc, +/topic/filter3/abc, test/topic/filter1/abc, test/topic/filter2/abc, test/topic/filter3/abc",
+ "remove, test/topic/filter, test/topic//filter, test/topic///filter, test/topic/filter, test/topic//filter, test/topic///filter",
+ "remove, test/topic/filter, test/topic//filter, test/topic///filter, test/topic///filter, test/topic//filter, test/topic/filter",
+ })
+ void branching_compaction(
+ final @NotNull String compactOperation, final @NotNull String filter1, final @NotNull String filter2,
+ final @NotNull String filter3, final @NotNull String topic1, final @NotNull String topic2,
+ final @NotNull String topic3) {
+
+ flows.subscribe(MqttTopicFilterImpl.of(filter1), null);
+ flows.subscribe(MqttTopicFilterImpl.of(filter2), null);
+ flows.subscribe(MqttTopicFilterImpl.of(filter3), null);
+
+ final MqttMatchingPublishFlows matching1 = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic1), matching1);
+ assertTrue(matching1.subscriptionFound);
+ final MqttMatchingPublishFlows matching2 = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic2), matching2);
+ assertTrue(matching2.subscriptionFound);
+ final MqttMatchingPublishFlows matching3 = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic3), matching3);
+ assertTrue(matching3.subscriptionFound);
+
+ switch (compactOperation) {
+ case "unsubscribe":
+ flows.unsubscribe(MqttTopicFilterImpl.of(filter1), null);
+ flows.unsubscribe(MqttTopicFilterImpl.of(filter2), null);
+ flows.unsubscribe(MqttTopicFilterImpl.of(filter3), null);
+ break;
+ case "remove":
+ flows.remove(MqttTopicFilterImpl.of(filter1), null);
+ flows.remove(MqttTopicFilterImpl.of(filter2), null);
+ flows.remove(MqttTopicFilterImpl.of(filter3), null);
+ break;
+ default:
+ fail();
+ }
+
+ final MqttMatchingPublishFlows matching4 = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic1), matching4);
+ assertFalse(matching4.subscriptionFound);
+ final MqttMatchingPublishFlows matching5 = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic2), matching5);
+ assertFalse(matching5.subscriptionFound);
+ final MqttMatchingPublishFlows matching6 = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic3), matching6);
+ assertFalse(matching6.subscriptionFound);
+ }
}
diff --git a/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowsTest.java b/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowsTest.java
index 2ee5f63e6..5a9202a4a 100644
--- a/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowsTest.java
+++ b/src/test/java/com/hivemq/client/internal/mqtt/handler/publish/incoming/MqttSubscriptionFlowsTest.java
@@ -23,6 +23,7 @@
import com.hivemq.client.internal.util.collections.HandleList;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.converter.ArgumentConversionException;
import org.junit.jupiter.params.converter.ConvertWith;
@@ -32,8 +33,7 @@
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
/**
* @author Silvio Giebl
@@ -52,7 +52,7 @@ public static class CsvToArray extends SimpleArgumentConverter {
private final @NotNull Supplier flowsSupplier;
@SuppressWarnings("NullabilityAnnotations")
- private MqttSubscriptionFlows flows;
+ MqttSubscriptionFlows flows;
MqttSubscriptionFlowsTest(final @NotNull Supplier flowsSupplier) {
this.flowsSupplier = flowsSupplier;
@@ -81,8 +81,9 @@ void subscribe_matchingTopicFilters_doMatch(
matchingFlows[i] = flow;
}
- final HandleList matching = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertTrue(matching.subscriptionFound);
assertFalse(matching.isEmpty());
assertEquals(ImmutableSet.copyOf(matchingFlows), ImmutableSet.copyOf(matching));
}
@@ -100,8 +101,9 @@ void subscribe_matchingTopicFilters_doMatch_noFlow(
flows.subscribe(MqttTopicFilterImpl.of(matchingTopicFilter), null);
}
- final HandleList matching = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertTrue(matching.subscriptionFound);
assertTrue(matching.isEmpty());
}
@@ -122,8 +124,9 @@ void subscribe_nonMatchingTopicFilters_doNotMatch(
assertEquals(ImmutableSet.of(topicFilter), ImmutableSet.copyOf(flow.getTopicFilters()));
}
- final HandleList matching = new HandleList<>();
- assertFalse(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertFalse(matching.subscriptionFound);
assertTrue(matching.isEmpty());
}
@@ -141,8 +144,9 @@ void subscribe_nonMatchingTopicFilters_doNotMatch_noFlow(
flows.subscribe(MqttTopicFilterImpl.of(notMatchingTopicFilter), null);
}
- final HandleList matching = new HandleList<>();
- assertFalse(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertFalse(matching.subscriptionFound);
assertTrue(matching.isEmpty());
}
@@ -162,8 +166,9 @@ void unsubscribe_matchingTopicFilters_doNoLongerMatch(
assertTrue(flow2.getTopicFilters().isEmpty());
assertEquals(ImmutableSet.of(flow1, flow2), ImmutableSet.copyOf(unsubscribed));
- final HandleList matching = new HandleList<>();
- assertFalse(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertFalse(matching.subscriptionFound);
assertTrue(matching.isEmpty());
}
@@ -179,8 +184,9 @@ void unsubscribe_matchingTopicFilters_doNoLongerMatch_noFlow(
flows.unsubscribe(MqttTopicFilterImpl.of(matchingTopicFilter), unsubscribed::add);
assertTrue(unsubscribed.isEmpty());
- final HandleList matching = new HandleList<>();
- assertFalse(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertFalse(matching.subscriptionFound);
assertTrue(matching.isEmpty());
}
@@ -201,8 +207,9 @@ void unsubscribe_nonMatchingTopicFilters_othersStillMatch(
assertTrue(flow2.getTopicFilters().isEmpty());
assertEquals(ImmutableSet.of(flow2), ImmutableSet.copyOf(unsubscribed));
- final HandleList matching = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertTrue(matching.subscriptionFound);
assertFalse(matching.isEmpty());
assertEquals(ImmutableSet.of(flow1), ImmutableSet.copyOf(matching));
}
@@ -220,8 +227,9 @@ void unsubscribe_nonMatchingTopicFilters_othersStillMatch_noFlow(
flows.unsubscribe(MqttTopicFilterImpl.of(notMatchingTopicFilter), unsubscribed::add);
assertTrue(unsubscribed.isEmpty());
- final HandleList matching = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertTrue(matching.subscriptionFound);
assertTrue(matching.isEmpty());
}
@@ -234,15 +242,17 @@ void cancel_doNoLongerMatch(final @NotNull String topic, final @NotNull String m
flows.subscribe(MqttTopicFilterImpl.of(matchingTopicFilter), flow2);
flows.cancel(flow1);
- HandleList matching = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertTrue(matching.subscriptionFound);
assertFalse(matching.isEmpty());
assertEquals(ImmutableSet.of(flow2), ImmutableSet.copyOf(matching));
flows.cancel(flow2);
- matching = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic), matching));
- assertTrue(matching.isEmpty());
+ final MqttMatchingPublishFlows matching2 = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching2);
+ assertTrue(matching2.subscriptionFound);
+ assertTrue(matching2.isEmpty());
}
@ParameterizedTest
@@ -253,12 +263,30 @@ void cancel_notPresentFlows_areIgnored(final @NotNull String topic, final @NotNu
flows.subscribe(MqttTopicFilterImpl.of(matchingTopicFilter), flow1);
flows.cancel(flow2);
- final HandleList matching = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertTrue(matching.subscriptionFound);
assertFalse(matching.isEmpty());
assertEquals(ImmutableSet.of(flow1), ImmutableSet.copyOf(matching));
}
+ @Test
+ void cancel_partiallyUnsubscribedFlow() {
+ final MqttSubscribedPublishFlow flow = mockSubscriptionFlow("test/topic(2)");
+ flows.subscribe(MqttTopicFilterImpl.of("test/topic"), flow);
+ flows.subscribe(MqttTopicFilterImpl.of("test/topic2"), flow);
+
+ flows.unsubscribe(MqttTopicFilterImpl.of("test/topic"), null);
+ flows.cancel(flow);
+
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of("test/topic"), matching);
+ assertFalse(matching.subscriptionFound);
+ flows.findMatching(MqttTopicImpl.of("test/topic2"), matching);
+ assertTrue(matching.subscriptionFound);
+ assertTrue(matching.isEmpty());
+ }
+
@ParameterizedTest
@CsvSource({
"1/a, 1/a, 2/a, 2/a", "1/a, 1/+, 2/a, 2/+", "1/a, 1/#, 2/a, 2/#", "1/a/b, 1/a/b, 2/a/b, 2/a/b",
@@ -279,12 +307,14 @@ void remove(
flows.remove(topicFilter, flow);
assertEquals(ImmutableSet.of(topicFilter2), ImmutableSet.copyOf(flow.getTopicFilters()));
- final HandleList matching = new HandleList<>();
- assertFalse(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertFalse(matching.subscriptionFound);
assertTrue(matching.isEmpty());
- final HandleList matching2 = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic2), matching2));
+ final MqttMatchingPublishFlows matching2 = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic2), matching2);
+ assertTrue(matching2.subscriptionFound);
assertFalse(matching2.isEmpty());
assertEquals(ImmutableSet.of(flow), ImmutableSet.copyOf(matching2));
}
@@ -304,12 +334,14 @@ void remove_noFlow(
flows.remove(MqttTopicFilterImpl.of(matchingTopicFilter), null);
- final HandleList matching = new HandleList<>();
- assertFalse(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertFalse(matching.subscriptionFound);
assertTrue(matching.isEmpty());
- final HandleList matching2 = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic2), matching2));
+ final MqttMatchingPublishFlows matching2 = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic2), matching2);
+ assertTrue(matching2.subscriptionFound);
assertTrue(matching2.isEmpty());
}
@@ -328,8 +360,9 @@ void remove_doesNotUnsubscribe(final @NotNull String topic, final @NotNull Strin
assertTrue(flow1.getTopicFilters().isEmpty());
assertEquals(ImmutableSet.of(topicFilter), ImmutableSet.copyOf(flow2.getTopicFilters()));
- final HandleList matching = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertTrue(matching.subscriptionFound);
assertFalse(matching.isEmpty());
assertEquals(ImmutableSet.of(flow2), ImmutableSet.copyOf(matching));
}
@@ -342,11 +375,63 @@ void remove_doesNotUnsubscribe_noFlow(final @NotNull String topic, final @NotNul
flows.remove(MqttTopicFilterImpl.of(matchingTopicFilter), null);
- final HandleList matching = new HandleList<>();
- assertTrue(flows.findMatching(MqttTopicImpl.of(topic), matching));
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertTrue(matching.subscriptionFound);
assertTrue(matching.isEmpty());
}
+ @ParameterizedTest
+ @CsvSource({"a/b, a/b/c, +/b/c, +/+/+", "a/b/c/d, a/b/c/d/e, +/b//d/ec, +/+/+/+/+"})
+ void findMatching_matchingMultipleButNotAllLevels(
+ final @NotNull String topic, final @NotNull String filter1, final @NotNull String filter2,
+ final @NotNull String filter3) {
+
+ flows.subscribe(MqttTopicFilterImpl.of(filter1), null);
+ flows.subscribe(MqttTopicFilterImpl.of(filter2), null);
+ flows.subscribe(MqttTopicFilterImpl.of(filter3), null);
+
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of(topic), matching);
+ assertFalse(matching.subscriptionFound);
+ assertTrue(matching.isEmpty());
+ }
+
+ @Test
+ void clear() {
+ final MqttSubscribedPublishFlow flow1 = mockSubscriptionFlow("test/topic/filter");
+ final MqttSubscribedPublishFlow flow2 = mockSubscriptionFlow("test2/topic/filter");
+ final MqttSubscribedPublishFlow flow3 = mockSubscriptionFlow("test/topic2/filter");
+ final MqttSubscribedPublishFlow flow4 = mockSubscriptionFlow("test/topic/filter2");
+ final MqttSubscribedPublishFlow flow5 = mockSubscriptionFlow("+/topic");
+ final MqttSubscribedPublishFlow flow6 = mockSubscriptionFlow("topic/#");
+ flows.subscribe(MqttTopicFilterImpl.of(flow1.toString()), flow1);
+ flows.subscribe(MqttTopicFilterImpl.of(flow2.toString()), flow2);
+ flows.subscribe(MqttTopicFilterImpl.of(flow3.toString()), flow3);
+ flows.subscribe(MqttTopicFilterImpl.of(flow4.toString()), flow4);
+ flows.subscribe(MqttTopicFilterImpl.of(flow5.toString()), flow5);
+ flows.subscribe(MqttTopicFilterImpl.of(flow6.toString()), flow6);
+
+ final Exception cause = new Exception("test");
+ flows.clear(cause);
+ verify(flow1).onError(cause);
+ verify(flow2).onError(cause);
+ verify(flow3).onError(cause);
+ verify(flow4).onError(cause);
+ verify(flow5).onError(cause);
+ verify(flow6).onError(cause);
+
+ final MqttMatchingPublishFlows matching = new MqttMatchingPublishFlows();
+ flows.findMatching(MqttTopicImpl.of("test/topic"), matching);
+ flows.findMatching(MqttTopicImpl.of("test/topic/filter"), matching);
+ flows.findMatching(MqttTopicImpl.of("test2/topic/filter"), matching);
+ flows.findMatching(MqttTopicImpl.of("test/topic2/filter"), matching);
+ flows.findMatching(MqttTopicImpl.of("test/topic/filter2"), matching);
+ flows.findMatching(MqttTopicImpl.of("abc/topic"), matching);
+ flows.findMatching(MqttTopicImpl.of("topic/abc"), matching);
+ assertFalse(matching.subscriptionFound);
+ }
+
@NotNull
private static MqttSubscribedPublishFlow mockSubscriptionFlow(final @NotNull String name) {
final MqttSubscribedPublishFlow flow = mock(MqttSubscribedPublishFlow.class);
diff --git a/src/test/java/com/hivemq/client/internal/util/ByteArrayTest.java b/src/test/java/com/hivemq/client/internal/util/ByteArrayTest.java
new file mode 100644
index 000000000..47222fb56
--- /dev/null
+++ b/src/test/java/com/hivemq/client/internal/util/ByteArrayTest.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2018 dc-square and the HiveMQ MQTT Client Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.hivemq.client.internal.util;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.jupiter.api.Test;
+
+/**
+ * @author Silvio Giebl
+ */
+class ByteArrayTest {
+
+ @Test
+ void equals() {
+ EqualsVerifier.forClass(ByteArray.class).verify();
+ }
+}
\ No newline at end of file