Skip to content

Commit

Permalink
Initial implementation of shared subscriptions (moquette-io#796)
Browse files Browse the repository at this point in the history
Contains a series of updates to the broker internal structures to sustain shared subscription registering and dispatching of publish messages.

In CTrie's CNode add a separate shared subscriptions data structure to contains the shared subscriptions list related to share name for a given topic filter. It's implemented as a map keyed by share name which maps to a list of shared subscriptions.
Updated the CTrie's insertion method to use a data class SubscriptionRequest to capture the shared and non shared subscription actions.
Implemented test cases to cover the various MQTT spec requirements as integrations tests, and adapted utility classes (like the low level Client to permit the verification).
  • Loading branch information
andsel authored Dec 16, 2023
1 parent 39b1688 commit b2c5198
Show file tree
Hide file tree
Showing 30 changed files with 1,211 additions and 295 deletions.
2 changes: 2 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Version 0.18-SNAPSHOT:
[feature] shared subscriptions:
- Initial implementation of shared subscription subscribe and publish part. (#796)
[fix] Implements requirements on reserved topics (starts with $). Implements the matching rules and avoid to proceed with processing on client's publishes on those topics (#793)
[feature] Handle will delay interval and MQTT5's Will optional properties (#770)
[fix] Handle empty collector batches in PostOffice (#777)
Expand Down
2 changes: 1 addition & 1 deletion broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
https://github.com/netty/netty/blob/netty-4.1.100.Final/pom.xml#L650 -->
<netty.tcnative.version>2.0.61.Final</netty.tcnative.version>
<paho.version>1.2.5</paho.version>
<hivemqclient.version>1.3.0</hivemqclient.version>
<hivemqclient.version>1.3.3</hivemqclient.version>
<h2.version>2.1.212</h2.version>
</properties>

Expand Down
61 changes: 42 additions & 19 deletions broker/src/main/java/io/moquette/broker/Authorizator.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import static io.moquette.broker.Utils.messageId;
import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
Expand All @@ -39,6 +40,18 @@ final class Authorizator {
this.policy = policy;
}


List<MqttTopicSubscription> verifyAlsoSharedTopicsReadAccess(String clientID, String username, MqttSubscribeMessage msg) {
return verifyTopicsReadAccessWithTopicExtractor(clientID, username, msg, Authorizator::extractShareTopic);
}

private static Topic extractShareTopic(String s) {
if (SharedSubscriptionUtils.isSharedSubscription(s)) {
return Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(s));
}
return Topic.asTopic(s);
}

/**
* @param clientID
* the clientID
Expand All @@ -49,33 +62,43 @@ final class Authorizator {
* @return the list of verified topics for the given subscribe message.
*/
List<MqttTopicSubscription> verifyTopicsReadAccess(String clientID, String username, MqttSubscribeMessage msg) {
return verifyTopicsReadAccessWithTopicExtractor(clientID, username, msg, Topic::asTopic);
}

private List<MqttTopicSubscription> verifyTopicsReadAccessWithTopicExtractor(String clientID, String username,
MqttSubscribeMessage msg, Function<String, Topic> topicExtractor) {
List<MqttTopicSubscription> ackTopics = new ArrayList<>();

final int messageId = messageId(msg);
for (MqttTopicSubscription req : msg.payload().topicSubscriptions()) {
Topic topic = new Topic(req.topicName());
if (!policy.canRead(topic, username, clientID)) {
// send SUBACK with 0x80, the user hasn't credentials to read the topic
LOG.warn("Client does not have read permissions on the topic username: {}, messageId: {}, " +
"topic: {}", username, messageId, topic);
ackTopics.add(new MqttTopicSubscription(topic.toString(), FAILURE));
} else {
MqttQoS qos;
if (topic.isValid()) {
LOG.debug("Client will be subscribed to the topic username: {}, messageId: {}, topic: {}",
username, messageId, topic);
qos = req.qualityOfService();
} else {
LOG.warn("Topic filter is not valid username: {}, messageId: {}, topic: {}",
username, messageId, topic);
qos = FAILURE;
}
ackTopics.add(new MqttTopicSubscription(topic.toString(), qos));
}
Topic topic = topicExtractor.apply(req.topicName());
final MqttQoS qos = getQoSCheckingAlsoPermissionsOnTopic(clientID, username, messageId, topic,
req.qualityOfService());
ackTopics.add(new MqttTopicSubscription(req.topicName(), qos));
}
return ackTopics;
}

private MqttQoS getQoSCheckingAlsoPermissionsOnTopic(String clientID, String username, int messageId,
Topic topic, MqttQoS requestedQoS) {
if (policy.canRead(topic, username, clientID)) {
if (topic.isValid()) {
LOG.debug("Client will be subscribed to the topic username: {}, messageId: {}, topic: {}",
username, messageId, topic);
return requestedQoS;
}

LOG.warn("Topic filter is not valid username: {}, messageId: {}, topic: {}",
username, messageId, topic);
return FAILURE;
}

// send SUBACK with 0x80, the user hasn't credentials to read the topic
LOG.warn("Client does not have read permissions on the topic username: {}, messageId: {}, " +
"topic: {}", username, messageId, topic);
return FAILURE;
}

/**
* Ask the authorization policy if the topic can be used in a publish.
*
Expand Down
10 changes: 6 additions & 4 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -690,10 +690,10 @@ void sendPublish(MqttPublishMessage publishMsg) {
final String topicName = publishMsg.variableHeader().topicName();
MqttQoS qos = publishMsg.fixedHeader().qosLevel();
if (LOG.isTraceEnabled()) {
LOG.trace("Sending PUBLISH({}) message. MessageId={}, topic={}, payload={}", qos, packetId, topicName,
DebugUtils.payload2Str(publishMsg.payload()));
LOG.trace("Sending PUBLISH({}) message. MessageId={}, topic={}, payload={} to {}", qos, packetId, topicName,
DebugUtils.payload2Str(publishMsg.payload()), getClientId());
} else {
LOG.debug("Sending PUBLISH({}) message. MessageId={}, topic={}", qos, packetId, topicName);
LOG.debug("Sending PUBLISH({}) message. MessageId={}, topic={} to {}", qos, packetId, topicName, getClientId());
}
sendIfWritableElseDrop(publishMsg);
}
Expand All @@ -703,7 +703,7 @@ void sendIfWritableElseDrop(MqttMessage msg) {
LOG.debug("OUT {}", msg.fixedHeader().messageType());
}
if (channel.isWritable()) {

LOG.debug("Sending message {} on the wire to {}", msg.fixedHeader().messageType(), getClientId());
// Sending to external, retain a duplicate. Just retain is not
// enough, since the receiver must have full control.
Object retainedDup = msg;
Expand All @@ -718,6 +718,8 @@ void sendIfWritableElseDrop(MqttMessage msg) {
channelFuture = channel.write(retainedDup);
}
channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE);
} else {
LOG.debug("Dropping message {} from the wire, msg: {}", msg.fixedHeader().messageType(), msg);
}
}

Expand Down
103 changes: 58 additions & 45 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.moquette.broker;

import io.moquette.broker.scheduler.ScheduledExpirationService;
import io.moquette.broker.subscriptions.ShareName;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.Subscription;
Expand All @@ -32,7 +33,6 @@
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.ReferenceCountUtil;
import org.apache.commons.codec.binary.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,6 +43,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -273,38 +274,77 @@ public void wipeExistingScheduledWill(String clientId) {
sessionRepository.deleteWill(clientId);
}

// Used for internal purposes of subscribeClientToTopics method
private static final class SharedSubscriptionData {
final ShareName name;
final Topic topicFilter;
final MqttQoS requestedQoS;

private SharedSubscriptionData(ShareName name, Topic topicFilter, MqttQoS requestedQoS) {
Objects.requireNonNull(name);
Objects.requireNonNull(topicFilter);
Objects.requireNonNull(requestedQoS);
this.name = name;
this.topicFilter = topicFilter;
this.requestedQoS = requestedQoS;
}

static SharedSubscriptionData fromMqttSubscription(MqttTopicSubscription sub) {
return new SharedSubscriptionData(new ShareName(SharedSubscriptionUtils.extractShareName(sub.topicName())),
Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(sub.topicName())), sub.qualityOfService());
}
}

public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, String username,
MQTTConnection mqttConnection) {
// verify which topics of the subscribe ongoing has read access permission
int messageID = messageId(msg);
final Session session = sessionRegistry.retrieve(clientID);

final List<SharedSubscriptionData> sharedSubscriptions;

if (mqttConnection.isProtocolVersion5()) {
for (MqttTopicSubscription topicFilter : msg.payload().topicSubscriptions()) {
if (isSharedSubscription(topicFilter.topicName())) {
final String shareName = extractShareName(topicFilter.topicName());
if (!validateShareName(shareName)) {
// this is a malformed packet, MQTT-4.13.1-1, disconnect it
LOG.info("{} used an invalid shared subscription name {}, disconnecting", clientID, shareName);
session.disconnectFromBroker();
return;
}
}
sharedSubscriptions = msg.payload().topicSubscriptions().stream()
.filter(sub -> SharedSubscriptionUtils.isSharedSubscription(sub.topicName()))
.map(SharedSubscriptionData::fromMqttSubscription)
.collect(Collectors.toList());

Optional<SharedSubscriptionData> invalidSharedSubscription = sharedSubscriptions.stream()
.filter(subData -> !SharedSubscriptionUtils.validateShareName(subData.name.toString()))
.findFirst();
if (invalidSharedSubscription.isPresent()) {
// this is a malformed packet, MQTT-4.13.1-1, disconnect it
LOG.info("{} used an invalid shared subscription name {}, disconnecting", clientID, invalidSharedSubscription.get().name);
session.disconnectFromBroker();
return;
}
} else {
sharedSubscriptions = Collections.emptyList();
}

List<MqttTopicSubscription> ackTopics;
if (mqttConnection.isProtocolVersion5()) {
ackTopics = authorizator.verifyAlsoSharedTopicsReadAccess(clientID, username, msg);
} else {
ackTopics = authorizator.verifyTopicsReadAccess(clientID, username, msg);
}
List<MqttTopicSubscription> ackTopics = authorizator.verifyTopicsReadAccess(clientID, username, msg);
MqttSubAckMessage ackMessage = doAckMessageFromValidateFilters(ackTopics, messageID);

// store topics subscriptions in session
// store topics of non-shared subscriptions in session
List<Subscription> newSubscriptions = ackTopics.stream()
.filter(req -> req.qualityOfService() != FAILURE)
.map(req -> {
final Topic topic = new Topic(req.topicName());
return new Subscription(clientID, topic, req.qualityOfService());
.filter(sub -> sub.qualityOfService() != FAILURE)
.filter(sub -> !SharedSubscriptionUtils.isSharedSubscription(sub.topicName()))
.map(sub -> {
final Topic topic = new Topic(sub.topicName());
return new Subscription(clientID, topic, sub.qualityOfService());
}).collect(Collectors.toList());

for (Subscription subscription : newSubscriptions) {
subscriptions.add(subscription);
subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.getRequestedQos());
}

for (SharedSubscriptionData sharedSubData : sharedSubscriptions) {
subscriptions.addShared(clientID, sharedSubData.name, sharedSubData.topicFilter, sharedSubData.requestedQoS);
}

// add the subscriptions to Session
Expand All @@ -320,33 +360,6 @@ public void subscribeClientToTopics(MqttSubscribeMessage msg, String clientID, S
}
}

/**
* @return the share name in the topic filter of format $share/{shareName}/{topicFilter}
* */
// VisibleForTesting
protected static String extractShareName(String sharedTopicFilter) {
int afterShare = "$share/".length();
int endOfShareName = sharedTopicFilter.indexOf('/', afterShare);
return sharedTopicFilter.substring(afterShare, endOfShareName);
}

/**
* @return true if shareName is well formed, is at least one characted and doesn't contain wildcard matchers
* */
private boolean validateShareName(String shareName) {
// MQTT-4.8.2-1 MQTT-4.8.2-2, must be longer than 1 char and do not contain + or #
Objects.requireNonNull(shareName);
return shareName.length() > 0 && !shareName.contains("+") && !shareName.contains("#");
}

/**
* @return true if topic filter is shared format
* */
private static boolean isSharedSubscription(String topicFilter) {
Objects.requireNonNull(topicFilter, "topicFilter can't be null");
return topicFilter.startsWith("$share/");
}

private void publishRetainedMessagesForSubscriptions(String clientID, List<Subscription> newSubscriptions) {
Session targetSession = this.sessionRegistry.retrieve(clientID);
for (Subscription subscription : newSubscriptions) {
Expand Down
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void startServer(IConfig config) throws IOException {
* @throws IOException in case of any IO Error.
*/
public void startServer(IConfig config, List<? extends InterceptHandler> handlers) throws IOException {
LOG.debug("Starting moquette integration using IConfig instance and intercept handlers");
LOG.debug("Starting Moquette integration using IConfig instance and intercept handlers");
startServer(config, handlers, null, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) 2012-2023 The original author or authors
* ------------------------------------------------------
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/
package io.moquette.broker;

import java.util.Objects;

/**
* Utility class that collects common utils methods for shared subscription topic parsing
* */
class SharedSubscriptionUtils {

/**
* @return the share name in the topic filter of format $share/{shareName}/{topicFilter}
* */
// VisibleForTesting
protected static String extractShareName(String sharedTopicFilter) {
int afterShare = "$share/".length();
int endOfShareName = sharedTopicFilter.indexOf('/', afterShare);
return sharedTopicFilter.substring(afterShare, endOfShareName);
}

/**
* @return the filter part from full topic filter of format $share/{shareName}/{topicFilter}
* */
// VisibleForTesting
protected static String extractFilterFromShared(String fullSharedTopicFilter) {
int afterShare = "$share/".length();
int endOfShareName = fullSharedTopicFilter.indexOf('/', afterShare);
return fullSharedTopicFilter.substring(endOfShareName + 1);
}

/**
* @return true if topic filter is shared format
* */
protected static boolean isSharedSubscription(String topicFilter) {
Objects.requireNonNull(topicFilter, "topicFilter can't be null");
return topicFilter.startsWith("$share/");
}

/**
* @return true if shareName is well-formed, is at least one characted and doesn't contain wildcard matchers
* */
protected static boolean validateShareName(String shareName) {
// MQTT-4.8.2-1 MQTT-4.8.2-2, must be longer than 1 char and do not contain + or #
Objects.requireNonNull(shareName);
return shareName.length() > 0 && !shareName.contains("+") && !shareName.contains("#");
}
}
Loading

0 comments on commit b2c5198

Please sign in to comment.