Skip to content

Commit

Permalink
use elapsed-time-since-boot instead of UTC time-of-day for mqtt keepa…
Browse files Browse the repository at this point in the history
…live calculations
  • Loading branch information
phoddie committed Feb 1, 2024
1 parent b461d09 commit 70f8bf1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
13 changes: 7 additions & 6 deletions examples/io/tcp/mqttclient/mqttclient.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
*/

import Timer from "timer";
import Time from "time";

const Overhead = 8;
const BufferFormat = "buffer";
Expand Down Expand Up @@ -249,7 +250,7 @@ class MQTTClient {

case MQTTClient.PINGREQ:
if (this.#options.keepalive)
this.#options.keepalive.write = Date.now();
this.#options.keepalive.write = Time.ticks;
// fall through
case MQTTClient.DISCONNECT:
if (2 > this.#writable)
Expand Down Expand Up @@ -664,7 +665,7 @@ class MQTTClient {
if (keepalive) {
options.keepalive = Timer.repeat(() => this.#keepalive(), keepalive * 250);
options.keepalive.interval = keepalive * 1000;
options.keepalive.read = options.keepalive.write = Date.now();
options.keepalive.read = options.keepalive.write = Time.ticks;
}

this.#state = "login";
Expand Down Expand Up @@ -694,7 +695,7 @@ class MQTTClient {
// traceOperation(false, operation);

if ((operation == MQTTClient.PINGRESP) && this.#options.keepalive)
this.#options.keepalive.read = Date.now();
this.#options.keepalive.read = Time.ticks;

if (MQTTClient.CONNACK === operation) {
if (msg.returnCode)
Expand Down Expand Up @@ -730,12 +731,12 @@ class MQTTClient {
}
#keepalive() {
const options = this.#options, keepalive = options.keepalive, interval = keepalive.interval;
const now = Date.now();
const now = Time.ticks;

if ((now - keepalive.read) >= (keepalive.interval + (keepalive.interval >> 1)))
if (Time.delta(keepalive.read, now) >= (keepalive.interval + (keepalive.interval >> 1)))
return void this.#onError("time out"); // no control packet received in 1.5x keepalive interval (expected PINGRESP)

if ((now - keepalive.write) < (((keepalive.interval >> 2) * 3) - 500))
if (Time.delta(keepalive.write, now) < (((keepalive.interval >> 2) * 3) - 500))
return;

// haven't sent a ping in (just under) 3/4 the keep alive interval
Expand Down
11 changes: 6 additions & 5 deletions modules/network/mqtt/mqtt.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import {Client as WSClient} from "websocket";
import {Socket} from "socket";
import Timer from "timer";
import Time from "time";

/*
* Implements a basic MQTT client. Upon creation of a client instance, provides methods for
Expand Down Expand Up @@ -206,7 +207,7 @@ export default class Client {
}
else if (0xD0 === parse.state) { // PINGRESP
if (this.#timer)
this.#timer.read = Date.now();
this.#timer.read = Time.ticks;
}
else if (0xE0 === parse.state) { // DISCONNECT
return;
Expand Down Expand Up @@ -495,7 +496,7 @@ export default class Client {

if (timeout) {
this.#timer = Timer.repeat(this.keepalive.bind(this), this.timeout >> 2);
this.#timer.read = this.#timer.write = Date.now();
this.#timer.read = this.#timer.write = Time.ticks;
}

delete this.connect;
Expand Down Expand Up @@ -575,12 +576,12 @@ export default class Client {
this.state = -1;
}
keepalive() {
const now = Date.now();
if ((now - this.#timer.read) >= (this.timeout + (this.timeout >> 1)))
const now = Time.ticks;
if (Time.delta(this.#timer.read, now) >= (this.timeout + (this.timeout >> 1)))
return void this.fail("read time out"); // nothing received in 1.5x keep alive interval is fail for client and server (in write-only client, it means client didn't receive ping response)

if (!this.server) { // client must send something within the keepalive interval. we always ping within that interval (ensures that write-only client occassionally receives something from server)
if ((now - this.#timer.write) >= (((this.timeout >> 2) * 3) - 500)) { // haven't sent a ping in (just under) 3/4 the keep alive interval
if (Time.delta(this.#timer.write, now) >= (((this.timeout >> 2) * 3) - 500)) { // haven't sent a ping in (just under) 3/4 the keep alive interval
try {
this.#timer.write = now;
this.ws.write(Uint8Array.of(0xC0, 0x00).buffer); // ping
Expand Down

0 comments on commit 70f8bf1

Please sign in to comment.