Skip to content

MQTT Topic Filter Support #2690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"type": "feature",
"description": "Adds MQTT topic filter parsing and conflict support; fixes a parsing bug that treated trailing and preceding empty levels incorrectly",
"pull_requests": []
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,62 @@ private Topic(String topic, List<Level> levels) {
}

/**
* Parses an MQTT topic and labels.
* Parses a string into an MQTT topic, including substitution labels.
*
* @param topic Topic to parse.
* @param topic string to parse into a modeled MQTT topic
* @return Returns the parsed topic.
* @throws TopicSyntaxException if the topic is malformed.
*/
public static Topic parse(String topic) {
return parse(TopicRule.TOPIC, topic);
}

/**
* Parses a string into an MQTT topic, including substitution labels.
*
* @param rule MQTT-specific rule to apply to the parsing
* @param topic string to parse into a modeled MQTT topic
* @return Returns the parsed topic.
* @throws TopicSyntaxException if the topic is malformed.
*/
public static Topic parse(TopicRule rule, String topic) {
List<Level> levels = new ArrayList<>();
Set<String> labels = new HashSet<>();

for (String level : topic.split("/")) {
if (level.contains("#") || level.contains("+")) {
if (topic.isEmpty()) {
throw new TopicSyntaxException("Topics and topic filters may not be empty");
}

boolean hasFullWildcard = false;

// use negative limit to allow zero-length captures which matches MQTT specification behavior
for (String level : topic.split("/", -1)) {
if (hasFullWildcard) {
throw new TopicSyntaxException(format(
"Wildcard levels are not allowed in MQTT topics. Found `%s` in `%s`",
"A full wildcard must be the last segment in a topic filter. Found `%s` in `%s`",
level,
topic));
} else if (level.startsWith("{") && level.endsWith("}")) {
}

if (level.contains("#") || level.contains("+")) {
if (rule == TopicRule.TOPIC) {
throw new TopicSyntaxException(format(
"Wildcard levels are not allowed in MQTT topics. Found `%s` in `%s`",
level,
topic));
} else if (level.length() > 1) {
throw new TopicSyntaxException(format(
"A wildcard must be the entire topic segment. Found `%s` in `%s`",
level,
topic));
}

if (level.equals("#")) {
hasFullWildcard = true;
}
}

if (level.startsWith("{") && level.endsWith("}")) {
String label = level.substring(1, level.length() - 1);
if (!LABEL_PATTERN.matcher(label).matches()) {
throw new TopicSyntaxException(format(
Expand Down Expand Up @@ -119,11 +158,27 @@ public boolean conflictsWith(Topic other) {
for (int i = 0; i < minSize; i++) {
Level thisLevel = levels.get(i);
Level otherLevel = other.levels.get(i);

String thisValue = thisLevel.getContent();
String otherValue = otherLevel.getContent();

// multi-level wildcard will conflict regardless of what the other level is
if (thisValue.equals("#") || otherValue.equals("#")) {
return true;
}

// single-level wildcard is a level match regardless of the other level
if (thisValue.equals("+") || otherValue.equals("+")) {
continue;
}

// Both are static levels with different values.
if (!thisLevel.isLabel() && !otherLevel.isLabel()
&& !thisLevel.getContent().equals(otherLevel.getContent())) {
&& !thisValue.equals(otherValue)) {
return false;
} else if (thisLevel.isLabel() != otherLevel.isLabel()) {
}

if (thisLevel.isLabel() != otherLevel.isLabel()) {
// One is static and the other is not, so there is not a
// conflict. One is more specific than the other.
return false;
Expand Down Expand Up @@ -222,4 +277,21 @@ public int hashCode() {
return Objects.hash(isLabel, value);
}
}

/**
* Controls the rules for how a value is parsed into a topic.
*/
public enum TopicRule {

/**
* Treat the value as a basic topic. Wildcards are not allowed.
*/
TOPIC,

/**
* Treat the value as a topic filter. Single and multi-level wildcards are allowed if they follow the
* MQTT specification properly.
*/
FILTER
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class TopicConflictTest {
@ParameterizedTest
@MethodSource("data")
public void patternConflicts(String topicA, String topicB, boolean isConflicting) {
Topic a = Topic.parse(topicA);
Topic b = Topic.parse(topicB);
@MethodSource("topicCases")
public void topicPatternConflicts(String topicA, String topicB, boolean isConflicting) {
Topic a = Topic.parse(Topic.TopicRule.TOPIC, topicA);
Topic b = Topic.parse(Topic.TopicRule.TOPIC, topicB);

if (a.conflictsWith(b) != isConflicting) {
if (isConflicting) {
Expand All @@ -26,7 +28,7 @@ public void patternConflicts(String topicA, String topicB, boolean isConflicting
}
}

public static Collection<Object[]> data() {
public static Collection<Object[]> topicCases() {
return Arrays.asList(new Object[][] {
// No conflict because a is more specific.
{"a", "{x}", false},
Expand All @@ -53,7 +55,81 @@ public static Collection<Object[]> data() {
// No conflict
{"a/b/c/d", "a/{b}/c/{d}", false},
// No conflict.
{"$aws/things/{thingName}/jobs/get", "$aws/things/{thingName}/jobs/start-next", false}
{"$aws/things/{thingName}/jobs/get", "$aws/things/{thingName}/jobs/start-next", false},
// No conflict, empty second level creates mismatch with single-level topic
{"a/", "a", false}
});
}

@ParameterizedTest
@MethodSource("topicFilterCases")
public void topicFilterPatternConflicts(String topicA, String topicB, boolean isConflicting) {
Topic a = Topic.parse(Topic.TopicRule.FILTER, topicA);
Topic b = Topic.parse(Topic.TopicRule.FILTER, topicB);

if (a.conflictsWith(b) != isConflicting) {
if (isConflicting) {
List<String> aLevels = a.getLevels().stream().map(Topic.Level::toString).collect(Collectors.toList());
String aMarkedTopic = String.join("@", aLevels);

List<String> bLevels = b.getLevels().stream().map(Topic.Level::toString).collect(Collectors.toList());
String bMarkedTopic = String.join("@", bLevels);

Assertions.fail("Expected conflict between `" + aMarkedTopic + "` and `" + bMarkedTopic + "`");
} else {
Assertions.fail("Unexpected conflict between `" + a + "` and `" + b + "`");
}
}
}

public static Collection<Object[]> topicFilterCases() {
return Arrays.asList(new Object[][] {
// No conflict because a is more specific.
{"a", "{x}", false},
// No conflict because "a" is more specific than "{y}".
{"a/{x}", "{y}/a", false},
// No conflict because "a" is more specific than "{x}".
{"{x}/a", "a/{y}", false},
// Conflicts because the topics are equivalent and the same length.
{"a/{x}", "a/{y}", true},
// Does not conflict because "{x}" and "{y}" are under different level prefixes.
{"a/{x}", "b/{y}", false},
// Conflicts because they have the same levels and the same length.
{"a/{x}/b", "a/{y}/b", true},
// Does not conflict because one is longer than the other.
{"a/{x}/b", "a/{y}/b/{z}", false},
// Does not conflict because one is longer than the other.
{"a/{x}/b", "a/{y}/b/{z}/c", false},
// Do not conflict because "b" is more specific than "{b}"
{"a/b/c", "a/{b}/c", false},
// Conflicts because they are all labels at the same level.
{"{a}/{b}/{c}", "{x}/{y}/{z}", true},
// No conflicts because one is longer than the other.
{"{a}/{b}/{c}", "{x}/{y}/{z}/{a}", false},
// No conflict
{"a/b/c/d", "a/{b}/c/{d}", false},
// No conflict.
{"$aws/things/{thingName}/jobs/get", "$aws/things/{thingName}/jobs/start-next", false},
// Conflicts because multi-level wild card matches rest of path
{"a/#", "a/b/c/d", true},
// Conflicts becase single-level wild card matches segment
{"a/+/c", "a/b/c", true},
// Conflicts becase single-level wild card matches label segment
{"a/{b}/c", "a/+/c", true},
// No conflict because single-level wildcard doesn't match multi-segment "b/c"
{"a/+/c", "a/b/c/d", false},
// Conflicts because '#' matches everything
{"#", "/", true},
// Conflicts because '#' matches everything
{"+/a", "#", true},
// Conflicts because 'a/a' matches both
{"+/a", "a/+", true},
// Conflicts because single-level wildcard matches empty segments
{"+/+", "/", true},
// Conflict because wildcard matches empty level
{"/", "+/", true},
// Conflict because wildcard matches empty level
{"/+", "/", true},
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,73 @@
public class TopicTest {
@Test
public void requiresThatLabelsSpanWholeLevel() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse("foo/bar/{baz}bam"));
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, "foo/bar/{baz}bam"));
}

@Test
public void requiresThatLabelsContainOneCharacter() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse("foo/bar/{}"));
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, "foo/bar/{}"));
}

@Test
public void requiresThatLabelsContainValidCharacters() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse("foo/bar/{nope nope}"));
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, "foo/bar/{nope nope}"));
}

@Test
public void doesNotAllowDuplicateLabels() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse("foo/bar/{nope}/{nope}"));
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, "foo/bar/{nope}/{nope}"));
}

@Test
public void doesNotSupportSingleLevelWildCards() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse("foo/bar/+/nope"));
public void doesNotSupportSingleLevelWildCardsOnTopics() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, "foo/bar/+/nope"));
}

@Test
public void doesNotSupportMultiLevelWildCards() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse("foo/bar/nope/#"));
public void doesNotSupportMultiLevelWildCardsOnTopics() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, "foo/bar/nope/#"));
}

@Test
public void detectsLabelSyntaxError() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse("foo/bar/nope/}"));
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, "foo/bar/nope/}"));
}

@Test
public void doesNotAllowEmpty() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, ""));
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.FILTER, ""));
}

@Test
public void doesNotAllowSingleLevelWildcardInTopic() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, "test/+/bar"));
}

@Test
public void doesNotAllowMultiLevelWildcardInTopic() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.TOPIC, "test/#"));
}

@Test
public void doesNotAllowMixedSingleLevelWildcard() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.FILTER, "test/+d/bar"));
}

@Test
public void doesNotAllowMixedMultiLevelWildcard() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.FILTER, "test/uff#dah/bar"));
}

@Test
public void doesNotAllowSegmentsAfterMultiLevelWildcardTopicFilter() {
assertThrows(TopicSyntaxException.class, () -> Topic.parse(Topic.TopicRule.FILTER, "test/#/bar"));
}

@Test
public void parsesTopicWithNoLabels() {
Topic topic = Topic.parse("foo/bar/baz");
Topic topic = Topic.parse(Topic.TopicRule.TOPIC, "foo/bar/baz");

assertThat(topic.toString(), equalTo("foo/bar/baz"));
assertThat(topic.getLevels(),
Expand All @@ -68,9 +99,56 @@ public void parsesTopicWithNoLabels() {
assertThat(topic, equalTo(topic));
}

@Test
public void parsesTopicFilterWithNoLabels() {
Topic topic = Topic.parse(Topic.TopicRule.FILTER, "foo/+/baz/#");

assertThat(topic.toString(), equalTo("foo/+/baz/#"));
assertThat(topic.getLevels(),
contains(
new Topic.Level("foo"),
new Topic.Level("+"),
new Topic.Level("baz"),
new Topic.Level("#")));
assertThat(topic.conflictsWith(topic), is(true));
assertThat(topic.getLabels(), empty());
assertFalse(topic.hasLabel("foo"));
assertThat(topic, equalTo(topic));
}

@Test
public void parsesSingleSlashTopic() {
Topic topic = Topic.parse(Topic.TopicRule.FILTER, "/");

assertThat(topic.toString(), equalTo("/"));
assertThat(topic.getLevels().size(), is(2));
assertThat(topic.getLevels().get(0).getContent(), equalTo(""));
assertThat(topic.getLevels().get(1).getContent(), equalTo(""));
}

@Test
public void parsesTopicWithTrailingSlash() {
Topic topic = Topic.parse(Topic.TopicRule.FILTER, "hello/");

assertThat(topic.toString(), equalTo("hello/"));
assertThat(topic.getLevels().size(), is(2));
assertThat(topic.getLevels().get(0).getContent(), equalTo("hello"));
assertThat(topic.getLevels().get(1).getContent(), equalTo(""));
}

@Test
public void parsesTopicWithEmptyTopLevel() {
Topic topic = Topic.parse(Topic.TopicRule.FILTER, "/world");

assertThat(topic.toString(), equalTo("/world"));
assertThat(topic.getLevels().size(), is(2));
assertThat(topic.getLevels().get(0).getContent(), equalTo(""));
assertThat(topic.getLevels().get(1).getContent(), equalTo("world"));
}

@Test
public void parsesTopicWithLabels() {
Topic topic = Topic.parse("foo/{foo}/bar/{baz}");
Topic topic = Topic.parse(Topic.TopicRule.TOPIC, "foo/{foo}/bar/{baz}");

assertThat(topic, equalTo(topic));
assertThat(topic.toString(), equalTo("foo/{foo}/bar/{baz}"));
Expand All @@ -93,8 +171,8 @@ public void parsesTopicWithLabels() {

@Test
public void topicEquality() {
Topic topic1 = Topic.parse("foo/bar");
Topic topic2 = Topic.parse("foo/{bar}");
Topic topic1 = Topic.parse(Topic.TopicRule.TOPIC, "foo/bar");
Topic topic2 = Topic.parse(Topic.TopicRule.TOPIC, "foo/{bar}");

assertThat(topic1, equalTo(topic1));
assertThat(topic1, not(equalTo(topic2)));
Expand Down