Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Why RxJava was chosen instead of e.g. Reactor? #255

Closed
mpe85 opened this issue Mar 6, 2019 · 4 comments · Fixed by #388
Closed

Why RxJava was chosen instead of e.g. Reactor? #255

mpe85 opened this issue Mar 6, 2019 · 4 comments · Fixed by #388
Assignees
Milestone

Comments

@mpe85
Copy link

mpe85 commented Mar 6, 2019

I'm just curious and I would like to know why you have chosen RxJava to implement the reactive API in your nice little library, and you didn't choose for example Reactor?
This is not to critisize RxJava but I would just like to know if there were some specific resasons.
Thanks in advance!

@siepkes
Copy link

siepkes commented Mar 6, 2019

I'm also kinda curious about this. I would guess because of Android compatibility. Personally I would also love a reactor API.

@SgtSilvio
Copy link
Member

Sorry for the delayed response.

RxJava was chosen because of Android compatibility and expressive types like Single and Completable (e.g. connect will exactly emit 1 ConnAck message). I know that opinions about the latter differ, but for someone that is new to Rx this might be simpler.

As RxJava and Reactor are both reactive-streams compliant, it is really easy to write an adapter.

Example for an interface:

public interface Mqtt5ReactorClient extends Mqtt5Client {

    static @NotNull Mqtt5ReactorClient from(final @NotNull Mqtt5Client client) {
        return new MqttReactorClient(client.toRx());
    }

    default @NotNull Mono<Mqtt5ConnAck> connect() {
        return connect(MqttConnect.DEFAULT);
    }

    @NotNull Mono<Mqtt5ConnAck> connect(@NotNull Mqtt5Connect connect);

    default @NotNull Mqtt5ConnectBuilder.Nested<Mono<Mqtt5ConnAck>> connectWith() {
        return new MqttConnectBuilder.Nested<>(this::connect);
    }

    @NotNull Mono<Mqtt5SubAck> subscribe(@NotNull Mqtt5Subscribe subscribe);

    default @NotNull Mqtt5SubscribeBuilder.Nested.Start<Mono<Mqtt5SubAck>> subscribeWith() {
        return new MqttSubscribeBuilder.Nested<>(this::subscribe);
    }

    @NotNull Flux<Mqtt5Publish> subscribeStream(@NotNull Mqtt5Subscribe subscribe);

    default @NotNull Mqtt5SubscribeBuilder.Nested.Start<Flux<Mqtt5Publish>> subscribeStreamWith() {
        return new MqttSubscribeBuilder.Nested<>(this::subscribeStream);
    }

    @NotNull Flux<Mqtt5Publish> publishes(@NotNull MqttGlobalPublishFilter filter);

    @NotNull Mono<Mqtt5UnsubAck> unsubscribe(@NotNull Mqtt5Unsubscribe unsubscribe);

    default @NotNull Mqtt5UnsubscribeBuilder.Nested.Start<Mono<Mqtt5UnsubAck>> unsubscribeWith() {
        return new MqttUnsubscribeBuilder.Nested<>(this::unsubscribe);
    }

    @NotNull Flux<Mqtt5PublishResult> publish(@NotNull Publisher<Mqtt5Publish> publishFlowable);

    @NotNull Mono<Void> reauth();

    default @NotNull Mono<Void> disconnect() {
        return disconnect(MqttDisconnect.DEFAULT);
    }

    @NotNull Mono<Void> disconnect(@NotNull Mqtt5Disconnect disconnect);

    default @NotNull Mqtt5DisconnectBuilder.Nested<Mono<Void>> disconnectWith() {
        return new MqttDisconnectBuilder.Nested<>(this::disconnect);
    }
}

Example for an implementation:

public class MqttReactorClient implements Mqtt5ReactorClient {

    private final @NotNull Mqtt5RxClient delegate;

    public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {
        this.delegate = delegate;
    }

    public @NotNull Mono<Mqtt5ConnAck> connect(final @NotNull Mqtt5Connect connect) {
        return Mono.fromDirect(delegate.connect(connect).toFlowable());
    }

    public @NotNull Mono<Mqtt5SubAck> subscribe(final @NotNull Mqtt5Subscribe subscribe) {
        return Mono.fromDirect(delegate.subscribe(subscribe).toFlowable());
    }

    public @NotNull Flux<Mqtt5Publish> subscribeStream(final @NotNull Mqtt5Subscribe subscribe) {
        return Flux.from(delegate.subscribeStream(subscribe));
    }

    public @NotNull Flux<Mqtt5Publish> publishes(final @NotNull MqttGlobalPublishFilter filter) {
        return Flux.from(delegate.publishes(filter));
    }

    public @NotNull Mono<Mqtt5UnsubAck> unsubscribe(final @NotNull Mqtt5Unsubscribe unsubscribe) {
        return Mono.fromDirect(delegate.unsubscribe(unsubscribe).toFlowable());
    }

    public @NotNull Flux<Mqtt5PublishResult> publish(final @NotNull Publisher<Mqtt5Publish> publishFlowable) {
        return Flux.from(delegate.publish(Flowable.fromPublisher(publishFlowable)));
    }

    public @NotNull Mono<Void> reauth() {
        return Mono.fromDirect(delegate.reauth().toFlowable());
    }

    public @NotNull Mono<Void> disconnect(final @NotNull Mqtt5Disconnect disconnect) {
        return Mono.fromDirect(delegate.disconnect(disconnect).toFlowable());
    }

    @Override
    public @NotNull Mqtt5ClientConfig getConfig() {
        return delegate.getConfig();
    }

    @Override
    public @NotNull Mqtt5RxClient toRx() {
        return delegate;
    }

    @Override
    public @NotNull Mqtt5AsyncClient toAsync() {
        return delegate.toAsync();
    }

    @Override
    public @NotNull Mqtt5BlockingClient toBlocking() {
        return delegate.toBlocking();
    }
}

We could directly integrate this into the library if the dependency on reactor can be made optional, so nobody is forced to have rxjava and reactor.

@mpe85
Copy link
Author

mpe85 commented Jun 13, 2019

Thanks for the answer, I think it would be quite nice if one could choose between RxJava and Reactor.

@SgtSilvio SgtSilvio added this to the 1.2 milestone Mar 2, 2020
@SgtSilvio SgtSilvio self-assigned this Mar 2, 2020
@SgtSilvio
Copy link
Member

@mpe85 @siepkes the hivemq-mqtt-client-reactor module is available with version 1.2.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants