Skip to content

Commit

Permalink
Implements reserved topic matching prescriptions (moquette-io#793)
Browse files Browse the repository at this point in the history
Updates the mathcing algorithm inside CTrie to negatively match topic names in the reserved form (start with $) against wildcards position as first token of a topic filter.
Refactored the tests to reuse integration fixture also outside the MQTT5 test suite, because this change regards all protocols starting from MQTT3.1.1.
  • Loading branch information
andsel authored Nov 11, 2023
1 parent c8258d1 commit 6a7bc49
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 45 deletions.
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[fix] Implements requirements on reserved topics (starts with $). Implements the matching rules and avoid to proceed with processing on client's publishes on those topics (#793)
[feature] Handle will delay interval and MQTT5's Will optional properties (#770)
[fix] Handle empty collector batches in PostOffice (#777)
[feature] Introduce a new option "PEER_CERTIFICATE_AS_USERNAME". When enabled, the client certificate is provided as the username to the authenticator.
Expand Down
8 changes: 7 additions & 1 deletion broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -608,15 +608,21 @@ PostOffice.RouteResult processPublish(MqttPublishMessage msg) {
dropConnection();
}

if (!topic.isEmpty() && topic.headToken().isReserved()) {
LOG.warn("Avoid to publish on topic which contains reserved topic (starts with $)");
return PostOffice.RouteResult.failed(clientId);
}

// retain else msg is cleaned by the NewNettyMQTTHandler and is not available
// in execution by SessionEventLoop
msg.retain();
switch (qos) {
case AT_MOST_ONCE:
return postOffice.routeCommand(clientId, "PUB QoS0", () -> {
checkMatchSessionLoop(clientId);
if (!isBoundToSession())
if (!isBoundToSession()) {
return null;
}
postOffice.receivedPublishQos0(topic, username, clientId, msg);
return null;
}).ifFailed(msg::release);
Expand Down
41 changes: 27 additions & 14 deletions broker/src/main/java/io/moquette/broker/subscriptions/CTrie.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,55 +50,68 @@ enum NavigationAction {
MATCH, GODEEP, STOP
}

private NavigationAction evaluate(Topic topic, CNode cnode) {
private NavigationAction evaluate(Topic topicName, CNode cnode, int depth) {
// depth 0 is the root node of all the topics, so for topic filter
// monitor/sensor we have <root> -> monitor -> sensor
final boolean isFirstLevel = depth == 1;
if (Token.MULTI.equals(cnode.getToken())) {
Token token = topicName.headToken();
if (token != null && token.isReserved() && isFirstLevel) {
// [MQTT-4.7.2-1] single wildcard can't match reserved topics
// if reserved token is the first of the topicName
return NavigationAction.STOP;
}
return NavigationAction.MATCH;
}
if (topic.isEmpty()) {
if (topicName.isEmpty()) {
return NavigationAction.STOP;
}
final Token token = topic.headToken();
if (!(Token.SINGLE.equals(cnode.getToken()) || cnode.getToken().equals(token) || ROOT.equals(cnode.getToken()))) {
return NavigationAction.STOP;
final Token token = topicName.headToken();
if (Token.SINGLE.equals(cnode.getToken()) || cnode.getToken().equals(token) || ROOT.equals(cnode.getToken())) {
if (Token.SINGLE.equals(cnode.getToken()) && token.isReserved() && isFirstLevel) {
// [MQTT-4.7.2-1] single wildcard can't match reserved topics
return NavigationAction.STOP;
}
return NavigationAction.GODEEP;
}
return NavigationAction.GODEEP;
return NavigationAction.STOP;
}

public List<Subscription> recursiveMatch(Topic topic) {
return recursiveMatch(topic, this.root);
public List<Subscription> recursiveMatch(Topic topicName) {
return recursiveMatch(topicName, this.root, 0);
}

private List<Subscription> recursiveMatch(Topic topic, INode inode) {
private List<Subscription> recursiveMatch(Topic topicName, INode inode, int depth) {
CNode cnode = inode.mainNode();
if (cnode instanceof TNode) {
return Collections.emptyList();
}
NavigationAction action = evaluate(topic, cnode);
NavigationAction action = evaluate(topicName, cnode, depth);
if (action == NavigationAction.MATCH) {
return cnode.subscriptions;
}
if (action == NavigationAction.STOP) {
return Collections.emptyList();
}
Topic remainingTopic = (ROOT.equals(cnode.getToken())) ? topic : topic.exceptHeadToken();
Topic remainingTopic = (ROOT.equals(cnode.getToken())) ? topicName : topicName.exceptHeadToken();
List<Subscription> subscriptions = new ArrayList<>();

// We should only consider the maximum three children children of
// type #, + or exact match
Optional<INode> subInode = cnode.childOf(Token.MULTI);
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get()));
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
}
subInode = cnode.childOf(Token.SINGLE);
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get()));
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
}
if (remainingTopic.isEmpty()) {
subscriptions.addAll(cnode.subscriptions);
} else {
subInode = cnode.childOf(remainingTopic.headToken());
if (subInode.isPresent()) {
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get()));
subscriptions.addAll(recursiveMatch(remainingTopic, subInode.get(), depth + 1));
}
}
return subscriptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,18 @@ Optional<CNode> lookup(Topic topic) {
* contain character # and + because they are reserved to listeners subscriptions, and not topic
* publishing.
*
* @param topic
* to use fo searching matching subscriptions.
* @param topicName
* to use for search matching subscriptions.
* @return the list of matching subscriptions, or empty if not matching.
*/
@Override
public List<Subscription> matchWithoutQosSharpening(Topic topic) {
return ctrie.recursiveMatch(topic);
public List<Subscription> matchWithoutQosSharpening(Topic topicName) {
return ctrie.recursiveMatch(topicName);
}

@Override
public List<Subscription> matchQosSharpening(Topic topic) {
final List<Subscription> subscriptions = matchWithoutQosSharpening(topic);
public List<Subscription> matchQosSharpening(Topic topicName) {
final List<Subscription> subscriptions = matchWithoutQosSharpening(topicName);

Map<String, Subscription> subsGroupedByClient = new HashMap<>();
for (Subscription sub : subscriptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.List;
import java.util.Set;

/**
* Contains all topic filters that are used to match against topic names.
* */
public interface ISubscriptionsDirectory {

void init(ISubscriptionsRepository sessionsRepository);
Expand Down
19 changes: 7 additions & 12 deletions broker/src/main/java/io/moquette/broker/subscriptions/Token.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,6 @@ protected String name() {
return name;
}

protected boolean match(Token t) {
if (MULTI.equals(t) || SINGLE.equals(t)) {
return false;
}

if (MULTI.equals(this) || SINGLE.equals(this)) {
return true;
}

return equals(t);
}

@Override
public int hashCode() {
int hash = 7;
Expand Down Expand Up @@ -86,4 +74,11 @@ public int compareTo(Token other) {
}
return name.compareTo(other.name);
}

/**
* Token which starts with $ is reserved
* */
public boolean isReserved() {
return name.startsWith("$");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,26 +171,39 @@ public void rogerLightTopicMatches() {
assertMatch("foo/bar/+", "foo/bar/");
}

private void assertMatch(String s, String t) {
@Test
public void givenTopicFilterStartingWithSingleWildcardDoesntMatchSpecialTopicNames() {
assertNotMatch("+/monitor/clients", "$SYS/monitor/clients");
assertMatch("outer/+/inner", "outer/$something/inner");
assertMatch("$SYS/monitor/+", "$SYS/monitor/clients");
}

@Test
public void givenTopicFilterStartingWithMultiWildcardDoesntMatchSpecialTopicNames() {
assertNotMatch("#", "$SYS/monitor/clients");
assertMatch("$SYS/#", "$SYS");
}

private void assertMatch(String topicFilter, String topicName) {
sut = new CTrieSubscriptionDirectory();
ISubscriptionsRepository sessionsRepository = new MemorySubscriptionsRepository();
sut.init(sessionsRepository);

Subscription sub = clientSubOnTopic("AnySensor1", s);
Subscription sub = clientSubOnTopic("AnySensor1", topicFilter);
sut.add(sub);

assertThat(sut.matchWithoutQosSharpening(asTopic(t))).isNotEmpty();
assertThat(sut.matchWithoutQosSharpening(asTopic(topicName))).isNotEmpty();
}

private void assertNotMatch(String subscription, String topic) {
private void assertNotMatch(String topicFilter, String topicName) {
sut = new CTrieSubscriptionDirectory();
ISubscriptionsRepository sessionsRepository = new MemorySubscriptionsRepository();
sut.init(sessionsRepository);

Subscription sub = clientSubOnTopic("AnySensor1", subscription);
Subscription sub = clientSubOnTopic("AnySensor1", topicFilter);
sut.add(sub);

assertThat(sut.matchWithoutQosSharpening(asTopic(topic))).isEmpty();
assertThat(sut.matchWithoutQosSharpening(asTopic(topicName))).isEmpty();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public void testAddNewDeepNodes() {
assertTrue(subscriptions.contains(happinessSensor));
}

static Subscription clientSubOnTopic(String clientID, String topicName) {
return new Subscription(clientID, asTopic(topicName), null);
static Subscription clientSubOnTopic(String clientID, String topicFilter) {
return new Subscription(clientID, asTopic(topicFilter), null);
}

@Test
Expand Down
85 changes: 85 additions & 0 deletions broker/src/test/java/io/moquette/integration/PublishTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package io.moquette.integration;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAck;
import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.moquette.integration.mqtt5.AbstractServerIntegrationTest;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

public class PublishTest extends AbstractServerIntegrationTest {
@Override
public String clientName() {
return "Publisher";
}

@Test
public void givenAConnectedClientWhenItPublishToReservedTopicNoPublishIsForwarded() throws InterruptedException {
// given a connected client
Mqtt3BlockingClient publisher = createHiveMqV3Client("publisher");

Mqtt3BlockingClient subscriber = createHiveMqV3Client("subscriber");

subscriber.subscribeWith()
.topicFilter("$sys/monitor")
.qos(MqttQos.AT_MOST_ONCE)
.send();

// when send a publish to reserved topic
publisher.publishWith()
.topic("$sys/monitor")
.payload("Something new".getBytes(StandardCharsets.UTF_8))
.send();

// then no message is published
verifyNoMessageIsReceived(subscriber, Duration.ofSeconds(2));
}

@NotNull
private static Mqtt3BlockingClient createHiveMqV3Client(String clientId) {
Mqtt3BlockingClient publisher = MqttClient.builder()
.useMqttVersion3()
.identifier(clientId)
.serverHost("localhost")
.serverPort(1883)
.buildBlocking();
Mqtt3ConnAck connectAck = publisher.connect();
assertEquals(Mqtt3ConnAckReturnCode.SUCCESS, connectAck.getReturnCode(), "Accept plain connection");
return publisher;
}

private static void verifyMessageIsReceived(String expectedPayload, Mqtt3BlockingClient client, Duration timeout) throws InterruptedException {
try (Mqtt3BlockingClient.Mqtt3Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt3Publish> publishMessage = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS);
final String payload = publishMessage.map(Mqtt3Publish::getPayloadAsBytes)
.map(b -> new String(b, StandardCharsets.UTF_8))
.orElse("Failed to load payload");
assertEquals(expectedPayload, payload, "Something new has to be received");
}
}

private static void verifyNoMessageIsReceived(Mqtt3BlockingClient testamentSubscriber, Duration timeout) throws InterruptedException {
try (Mqtt3BlockingClient.Mqtt3Publishes publishes = testamentSubscriber.publishes(MqttGlobalPublishFilter.ALL)) {
Optional<Mqtt3Publish> publishedWill = publishes.receive(timeout.getSeconds(), TimeUnit.SECONDS);

// verify no published will in 10 seconds
assertFalse(publishedWill.isPresent(), "No message should be published");
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.nio.file.Path;
import java.util.Properties;

abstract class AbstractServerIntegrationTest {
public abstract class AbstractServerIntegrationTest {
Server broker;
IConfig config;

Expand All @@ -26,7 +26,7 @@ abstract class AbstractServerIntegrationTest {

Client lowLevelClient;

abstract String clientName();
public abstract String clientName();

protected void startServer(String dbPath) throws IOException {
broker = new Server();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ConnectAckTest extends AbstractServerIntegrationTest {
private MqttConnAckMessage connAck;

@Override
String clientName() {
public String clientName() {
return "client";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void tearDown() throws Exception {
}

@Override
String clientName() {
public String clientName() {
return "subscriber";
}

Expand Down

0 comments on commit 6a7bc49

Please sign in to comment.