Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/automatic resubscribe #398

Merged
merged 22 commits into from
Apr 18, 2020
Merged

Conversation

SgtSilvio
Copy link
Member

@SgtSilvio SgtSilvio commented Apr 6, 2020

Motivation
Resolves #297 and resolves #288
This is a prerequisite for proper implementation of manual acknowledgement and message persistence.

Changes

  • Enable subscribe(Stream) and unsubscribe independent of connect (similar to publishes)
  • Automatic resubscribe when automatically reconnecting and no session is present
  • Flows are not expired when automatic reconnect is enabled (instead subscriptions are automatically restored)
  • Improved concurrency of MqttPublishFlowableAckLink, FlowableWithSingleObserveOn and FluxWithSinglePublishOn

@cla-bot cla-bot bot added the cla-signed label Apr 6, 2020
@SgtSilvio SgtSilvio self-assigned this Apr 6, 2020
@SgtSilvio SgtSilvio force-pushed the feature/automatic-resubscribe branch from e3d20ec to 09efec5 Compare April 6, 2020 10:17
@SgtSilvio SgtSilvio requested review from gitseti, a user, YannickWeber and RobinAtherton and removed request for YannickWeber April 8, 2020 09:15
@SgtSilvio SgtSilvio mentioned this pull request Apr 15, 2020
@SgtSilvio SgtSilvio removed the request for review from RobinAtherton April 16, 2020 14:21
… to publishes)

Subscription and global flows are not expired when reconnect is enabled
Removed Flowable/FluxWithSingleCombine which had concurrency issues
Restored Flowable/FluxWithSingleCombine and fixed concurrency issues
…s more consistent: MqttIncomingPublishFlow(s) and MqttSubscribedPublishFlow(s))
@SgtSilvio SgtSilvio force-pushed the feature/automatic-resubscribe branch from 7b7a86e to 828edad Compare April 17, 2020 09:21
@sauroter sauroter self-requested a review April 17, 2020 09:21
subscriptionIdentifiersAvailable = connectionConfig.areSubscriptionIdentifiersAvailable();

if (!hasSession) {
incomingPublishFlows.getSubscriptions().forEach((subscriptionIdentifier, subscriptions) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this bahavior should be configurable somehow.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should be configurable?
If you use automatic reconnect and the server lost the session between the connection loss and a successful reconnect, why would you not want to resubscribe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually feedback from users who are confused by SessionExpiredExceptions when they use automatic reconnect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use case: You are in extremely constrained networks and want to be very precise on which data you send and when you send it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same is true for resending publishes.
Isn't automatic reconnect the wrong choice if you want to have control over everything?

Copy link
Member

@sauroter sauroter Apr 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same is true for resending publishes.

I did not know that. But I would argue that the same logic applies.

I feel like the connect is something else. As it is the basis for everything afterwards. And in the connect there are possibilities to influence the behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So would a config to disable automatic resubscribe be enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah for a start I think so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added configuration option: Reconnector.resubscribeIfSessionExpired (default true)
Also added a similar option for republishing: Reconnector.republishIfSessionExpired (default false)

Copy link
Member

@sauroter sauroter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good. The question about the configurability of the resubscribe must be resolved.

…ibedPublishFlowTree, reasons:

the first is tightly coupled to the MqttOutgoingQosHandler
the second is similar to a collection which is instantiated
@SgtSilvio SgtSilvio merged commit d7b38e9 into develop Apr 18, 2020
@SgtSilvio SgtSilvio deleted the feature/automatic-resubscribe branch April 18, 2020 21:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants