Skip to content

Commit

Permalink
Merge pull request #322 from hivemq/develop
Browse files Browse the repository at this point in the history
Release 1.1.2
  • Loading branch information
SgtSilvio committed Aug 9, 2019
2 parents f678684 + 70c56fd commit e0c677f
Show file tree
Hide file tree
Showing 30 changed files with 412 additions and 118 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

[![Build Status](https://travis-ci.org/hivemq/hivemq-mqtt-client.svg?branch=develop)](https://travis-ci.org/hivemq/hivemq-mqtt-client)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.hivemq/hivemq-mqtt-client/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.hivemq/hivemq-mqtt-client)
[![JitPack](https://jitpack.io/v/hivemq/hivemq-mqtt-client.svg)](https://jitpack.io/#hivemq/hivemq-mqtt-client)

MQTT 5.0 and 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and
backpressure support.
Expand Down Expand Up @@ -46,7 +47,7 @@ If you use Gradle, just include the following inside your `build.gradle` file.

```groovy
dependencies {
compile group: 'com.hivemq', name: 'hivemq-mqtt-client', version: '1.1.1'
compile group: 'com.hivemq', name: 'hivemq-mqtt-client', version: '1.1.2'
}
```

Expand All @@ -68,7 +69,7 @@ NOTE: You have to set the compiler version to `1.8` or higher.
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.1.1</version>
<version>1.1.2</version>
</dependency>
</dependencies>
...
Expand All @@ -85,7 +86,7 @@ To use the shaded version just append `-shaded` to the artifact name.

```groovy
dependencies {
compile group: 'com.hivemq', name: 'hivemq-mqtt-client-shaded', version: '1.1.1'
compile group: 'com.hivemq', name: 'hivemq-mqtt-client-shaded', version: '1.1.2'
}
```

Expand All @@ -98,7 +99,7 @@ dependencies {
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client-shaded</artifactId>
<version>1.1.1</version>
<version>1.1.2</version>
</dependency>
</dependencies>
...
Expand Down
31 changes: 29 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'biz.aQute.bnd:biz.aQute.bnd.gradle:4.2.0'
}
}

plugins {
id 'java-library'
id 'maven-publish'
Expand All @@ -10,25 +19,29 @@ plugins {
id 'com.github.johnrengelman.shadow' version '4.0.4'
id 'com.github.hierynomus.license' version '0.14.0'
id 'pmd'
id 'biz.aQute.bnd.builder' version '4.2.0'
}

group 'com.hivemq'
version '1.1.1' + (Boolean.valueOf(System.getProperty("snapshot")) ? "-SNAPSHOT" : "")
version '1.1.2' + (Boolean.valueOf(System.getProperty("snapshot")) ? "-SNAPSHOT" : "")
description 'HiveMQ MQTT Client is a MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client ' +
'library with different API flavours and backpressure support'

ext {
moduleName = 'com.hivemq.client.mqtt'
readableName = 'HiveMQ MQTT Client'
githubOrg = 'hivemq'
githubRepo = 'hivemq-mqtt-client'
githubUrl = 'https://github.com/' + githubOrg + '/' + githubRepo
scmConnection = 'scm:git:git://github.com/' + githubOrg + '/' + githubRepo + '.git'
scmDeveloperConnection = 'scm:git:ssh://git@github.com/' + githubOrg + '/' + githubRepo + '.git'
issuesUrl = githubUrl + '/issues'
docUrl = 'https://' + githubOrg + '.github.io/' + githubRepo + '/'
licenseShortName = 'Apache-2.0'
licenseReadableName = 'The Apache License, Version 2.0'
licenseUrl = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
shadedAppendix = 'shaded'
prevVersion = '1.0.1'
prevVersion = '1.1.1'
}

sourceCompatibility = 1.8
Expand Down Expand Up @@ -90,6 +103,20 @@ apply from: 'japicc.gradle'

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

jar {
bnd('Automatic-Module-Name': project.moduleName,
'Bundle-Name': project.name,
'Bundle-SymbolicName': project.moduleName,
'Bundle-Description': project.description,
'Bundle-Vendor': 'HiveMQ and the HiveMQ Community',
'Bundle-License': project.licenseShortName + ';description="' + project.licenseReadableName + '";link="' + project.licenseUrl + '"',
'Bundle-DocURL': project.docUrl,
'Bundle-SCM': 'url="' + project.githubUrl + '";connection="' + project.scmConnection + '";developerConnection="' + project.scmDeveloperConnection + '"',
'Export-Package': 'com.hivemq.client.annotations.*, com.hivemq.client.mqtt.*, com.hivemq.client.rx.*, com.hivemq.client.util.*',
'-consumer-policy': '${range;[==,=+)}',
'-removeheaders': 'Private-Package')
}

shadowJar { ShadowJar shadowJar ->
appendix shadedAppendix
classifier null
Expand Down
12 changes: 8 additions & 4 deletions publishing.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ void addPom(MavenPublication publication) {
developerConnection = project.scmDeveloperConnection
url = project.githubUrl
}
issueManagement {
system = 'github'
url = project.issuesUrl
}
}
}

Expand Down Expand Up @@ -124,10 +128,10 @@ bintray {
repo = 'HiveMQ'
name = 'hivemq-mqtt-client'
desc = project.description
websiteUrl = githubUrl
issueTrackerUrl = githubUrl + '/issues'
vcsUrl = githubUrl + '.git'
licenses = [licenseShortName]
websiteUrl = project.githubUrl
issueTrackerUrl = project.issuesUrl
vcsUrl = project.githubUrl + '.git'
licenses = [project.licenseShortName]
labels = ['mqtt', 'mqtt-client', 'iot', 'internet-of-things', 'rxjava2', 'reactive-streams', 'backpressure']
version {
released = new Date()
Expand Down
64 changes: 51 additions & 13 deletions src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,54 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* @author Silvio Giebl
*/
public class MqttAsyncClient implements Mqtt5AsyncClient {

private static final @NotNull Function<Mqtt5SubAck, Mqtt5SubAck> SUBACK_HANDLER = MqttBlockingClient::handleSubAck;
private static final @NotNull Function<Mqtt5UnsubAck, Mqtt5UnsubAck> UNSUBACK_HANDLER =
MqttBlockingClient::handleUnsubAck;
private static @NotNull CompletableFuture<@NotNull Mqtt5SubAck> handleSubAck(
final @NotNull CompletableFuture<@NotNull Mqtt5SubAck> future, final @NotNull MqttSubscribe subscribe) {

if (subscribe.getSubscriptions().size() == 1) {
return future;
}
final CompletableFuture<Mqtt5SubAck> mappedFuture = new CompletableFuture<>();
future.whenComplete((subAck, throwable) -> {
if (throwable != null) {
mappedFuture.completeExceptionally(throwable);
} else {
try {
mappedFuture.complete(MqttBlockingClient.handleSubAck(subAck));
} catch (final Throwable t) {
mappedFuture.completeExceptionally(t);
}
}
});
return mappedFuture;
}

private static @NotNull CompletableFuture<@NotNull Mqtt5UnsubAck> handleUnsubAck(
final @NotNull CompletableFuture<@NotNull Mqtt5UnsubAck> future,
final @NotNull MqttUnsubscribe unsubscribe) {

if (unsubscribe.getTopicFilters().size() == 1) {
return future;
}
final CompletableFuture<Mqtt5UnsubAck> mappedFuture = new CompletableFuture<>();
future.whenComplete((unsubAck, throwable) -> {
if (throwable != null) {
mappedFuture.completeExceptionally(throwable);
} else {
try {
mappedFuture.complete(MqttBlockingClient.handleUnsubAck(unsubAck));
} catch (final Throwable t) {
mappedFuture.completeExceptionally(t);
}
}
});
return mappedFuture;
}

private final @NotNull MqttRxClient delegate;

Expand All @@ -74,7 +112,7 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
public @NotNull CompletableFuture<@NotNull Mqtt5SubAck> subscribe(final @Nullable Mqtt5Subscribe subscribe) {
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);

return RxFutureConverter.toFuture(delegate.subscribe(mqttSubscribe)).thenApply(SUBACK_HANDLER);
return handleSubAck(RxFutureConverter.toFuture(delegate.subscribe(mqttSubscribe)), mqttSubscribe);
}

@Override
Expand All @@ -84,9 +122,9 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
Checks.notNull(callback, "Callback");

return delegate.subscribeStream(mqttSubscribe)
.subscribeSingleFuture(new CallbackSubscriber(callback))
.thenApply(SUBACK_HANDLER);
return handleSubAck(
delegate.subscribeStream(mqttSubscribe).subscribeSingleFuture(new CallbackSubscriber(callback)),
mqttSubscribe);
}

@Override
Expand All @@ -98,10 +136,10 @@ public class MqttAsyncClient implements Mqtt5AsyncClient {
Checks.notNull(callback, "Callback");
Checks.notNull(executor, "Executor");

return delegate.subscribeStreamUnsafe(mqttSubscribe)
.observeOnBoth(Schedulers.from(executor), true)
.subscribeSingleFuture(new CallbackSubscriber(callback))
.thenApply(SUBACK_HANDLER);
return handleSubAck(
delegate.subscribeStreamUnsafe(mqttSubscribe)
.observeOnBoth(Schedulers.from(executor), true).subscribeSingleFuture(new CallbackSubscriber(callback)),
mqttSubscribe);
}

@Override
Expand Down Expand Up @@ -134,7 +172,7 @@ public void publishes(

final MqttUnsubscribe mqttUnsubscribe = MqttChecks.unsubscribe(unsubscribe);

return RxFutureConverter.toFuture(delegate.unsubscribe(mqttUnsubscribe)).thenApply(UNSUBACK_HANDLER);
return handleUnsubAck(RxFutureConverter.toFuture(delegate.unsubscribe(mqttUnsubscribe)), mqttUnsubscribe);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private static class MqttPublishes implements Mqtt5Publishes, FlowableSubscriber
private final @NotNull AtomicReference<@Nullable Subscription> subscription = new AtomicReference<>();
private final @NotNull LinkedList<Entry> entries = new LinkedList<>();
private @Nullable Mqtt5Publish queuedPublish;
private boolean cancelled;
private @Nullable Throwable error;

MqttPublishes(final @NotNull Flowable<Mqtt5Publish> publishes) {
publishes.subscribe(this);
Expand All @@ -192,7 +192,7 @@ private void request() {
@Override
public void onNext(final @NotNull Mqtt5Publish publish) {
synchronized (entries) {
if (cancelled) {
if (error != null) {
return;
}
Entry entry;
Expand All @@ -216,9 +216,10 @@ public void onComplete() {
@Override
public void onError(final @NotNull Throwable t) {
synchronized (entries) {
if (cancelled) {
if (error != null) {
return;
}
error = t;
Entry entry;
while ((entry = entries.poll()) != null) {
entry.result.set(t);
Expand All @@ -231,8 +232,8 @@ public void onError(final @NotNull Throwable t) {
public @NotNull Mqtt5Publish receive() throws InterruptedException {
final Entry entry;
synchronized (entries) {
if (cancelled) {
throw new CancellationException();
if (error != null) {
throw handleError(error);
}
final Mqtt5Publish publish = receiveNowUnsafe();
if (publish != null) {
Expand All @@ -253,10 +254,7 @@ public void onError(final @NotNull Throwable t) {
return (Mqtt5Publish) result;
}
if (result instanceof Throwable) {
if (result instanceof RuntimeException) {
throw AsyncRuntimeException.fillInStackTrace((RuntimeException) result);
}
throw new RuntimeException((Throwable) result);
throw handleError((Throwable) result);
}
if (interruptedException != null) {
throw interruptedException;
Expand All @@ -275,8 +273,8 @@ public void onError(final @NotNull Throwable t) {

final Entry entry;
synchronized (entries) {
if (cancelled) {
throw new CancellationException();
if (error != null) {
throw handleError(error);
}
final Mqtt5Publish publish = receiveNowUnsafe();
if (publish != null) {
Expand All @@ -297,10 +295,7 @@ public void onError(final @NotNull Throwable t) {
return Optional.of((Mqtt5Publish) result);
}
if (result instanceof Throwable) {
if (result instanceof RuntimeException) {
throw AsyncRuntimeException.fillInStackTrace((RuntimeException) result);
}
throw new RuntimeException((Throwable) result);
throw handleError((Throwable) result);
}
if (interruptedException != null) {
throw interruptedException;
Expand All @@ -312,8 +307,8 @@ public void onError(final @NotNull Throwable t) {
public @NotNull Optional<Mqtt5Publish> receiveNow() {
final Mqtt5Publish publish;
synchronized (entries) {
if (cancelled) {
throw new CancellationException();
if (error != null) {
throw handleError(error);
}
publish = receiveNowUnsafe();
}
Expand All @@ -337,18 +332,25 @@ public void close() {
subscription.cancel();
}
synchronized (entries) {
if (cancelled) {
if (error != null) {
return;
}
cancelled = true;
error = new CancellationException();
Entry entry;
while ((entry = entries.poll()) != null) {
entry.result.set(new CancellationException());
entry.result.set(error);
entry.latch.countDown();
}
}
}

private @NotNull RuntimeException handleError(final @NotNull Throwable t) {
if (t instanceof RuntimeException) {
return AsyncRuntimeException.fillInStackTrace((RuntimeException) t);
}
throw new RuntimeException(t);
}

private static class Entry {

static final @NotNull Object CANCELLED = new Object();
Expand Down
Loading

0 comments on commit e0c677f

Please sign in to comment.