Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LongPoll #814

Merged
merged 55 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
c9d4f52
set received_at in dispatchEvent
mahboubii Nov 22, 2021
b54d329
break _buildUrl
mahboubii Nov 22, 2021
ba9d5ec
first draft of ws poll
mahboubii Nov 22, 2021
e935fb7
fix tests
mahboubii Nov 22, 2021
8e4c629
Merge branch 'master' into longpoll
mahboubii Nov 24, 2021
50a56d6
refactor connection state
mahboubii Nov 24, 2021
4099feb
APIErrorCodes
mahboubii Nov 24, 2021
428c373
Merge branch 'master' into longpoll
mahboubii Nov 24, 2021
a32ac24
remove temp fallback
mahboubii Nov 24, 2021
c379c33
Merge branch 'master' into longpoll
mahboubii Nov 26, 2021
59449c9
fix wrong err code
mahboubii Nov 26, 2021
8b2b539
add ConnectionIDNotFoundError
mahboubii Nov 26, 2021
6a10060
_setConnectionID
mahboubii Nov 26, 2021
7a5c799
reset longpoll on CONNECTION_ID_ERROR
mahboubii Nov 26, 2021
eb1cb86
rm space
mahboubii Nov 29, 2021
dc2b0a7
rate limit
mahboubii Nov 29, 2021
7df789f
refactor _buildWSPayload
mahboubii Nov 29, 2021
3926056
_getConnectionID
mahboubii Nov 29, 2021
7818ee1
waitForHealthy timeout from connect
mahboubii Nov 29, 2021
3c2f056
clean fallback from WS class
mahboubii Nov 29, 2021
564aefb
fallback isHealthy
mahboubii Nov 29, 2021
de7e384
client run fallback ws
mahboubii Nov 29, 2021
ac77ddc
Merge branch 'master' into longpoll
Nov 29, 2021
8adcad9
close longpoll properly
mahboubii Nov 29, 2021
ac4fe43
delete errors.ts
mahboubii Nov 29, 2021
cb72f7e
client use fallback if WS error
mahboubii Nov 29, 2021
e8ef9e3
setLocalDevice check for wsfallback
mahboubii Nov 29, 2021
7035fd7
connection isDisconnected flag
mahboubii Nov 29, 2021
2dd8dfa
closeConnection promise
mahboubii Nov 29, 2021
b878e70
fix tests
mahboubii Nov 29, 2021
8dd4169
set this.isConnecting to false on err
mahboubii Nov 29, 2021
813a231
Merge branch 'master' into longpoll
mahboubii Nov 29, 2021
afaa2c4
reset connection id on connect
mahboubii Nov 30, 2021
ac909ab
refactor errors
mahboubii Nov 30, 2021
4268d46
retry logic
mahboubii Nov 30, 2021
e8462da
replace local port
mahboubii Nov 30, 2021
8d66397
typo
mahboubii Nov 30, 2021
db18d0a
Merge branch 'master' into longpoll
Nov 30, 2021
6d9ff79
poll sleep on err
mahboubii Nov 30, 2021
88c54b9
fix multiple recover call
mahboubii Nov 30, 2021
6e586b0
disconnect in case of non retryable errors
mahboubii Nov 30, 2021
08e8429
fallback recover state
mahboubii Nov 30, 2021
fd558ab
enableWSFallback flag warning
mahboubii Nov 30, 2021
f5a943c
check browser is online before longpoll
mahboubii Nov 30, 2021
9021139
refactor addConnectionEventListeners
mahboubii Nov 30, 2021
020a090
connection.changed events
mahboubii Dec 1, 2021
9cc7b2d
fallback logger
mahboubii Dec 1, 2021
13c95b9
removed todo
mahboubii Dec 1, 2021
840cfbd
fix event condition
mahboubii Dec 1, 2021
7c61d87
4.5.0-beta.0
mahboubii Dec 1, 2021
7b4ac06
warn for missing navigation
mahboubii Dec 1, 2021
8a989b5
fix APIError type
mahboubii Dec 1, 2021
9dbafe3
fix isOnline
mahboubii Dec 2, 2021
8229b7d
export ConnectionState
mahboubii Dec 2, 2021
a7ee9ae
test client
mahboubii Dec 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
"babel/no-invalid-this": 2,
"array-callback-return": 2,
"valid-typeof": 2,
"arrow-body-style": 2,
"react/prop-types": 0,
"no-var": 2,
"linebreak-style": [2, "unix"],
Expand Down
77 changes: 56 additions & 21 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { StableWSConnection } from './connection';
import { isValidEventType } from './events';
import { JWTUserToken, DevToken, CheckSignature } from './signing';
import { TokenManager } from './token_manager';
import { WSConnectionFallback } from './connection_fallback';
import {
isFunction,
isOwnUserBaseProperty,
Expand All @@ -20,6 +21,7 @@ import {
randomId,
sleep,
retryInterval,
isWSFailure,
} from './utils';

import {
Expand Down Expand Up @@ -146,7 +148,6 @@ export class StreamChat<
cleaningIntervalRef?: NodeJS.Timeout;
clientID?: string;
configs: Configs<CommandType>;
connectionID?: string;
key: string;
listeners: {
[key: string]: Array<
Expand Down Expand Up @@ -185,9 +186,12 @@ export class StreamChat<
MessageType,
ReactionType
> | null;
wsFallback?: WSConnectionFallback;
wsPromise: ConnectAPIResponse<ChannelType, CommandType, UserType> | null;
consecutiveFailures: number;
insightMetrics: InsightMetrics;
defaultWSTimeoutWithFallback: number;
defaultWSTimeout: number;

/**
* Initialize a client
Expand Down Expand Up @@ -273,6 +277,9 @@ export class StreamChat<
this.consecutiveFailures = 0;
this.insightMetrics = new InsightMetrics();

this.defaultWSTimeoutWithFallback = 6000;
this.defaultWSTimeout = 15000;

/**
* logger function should accept 3 parameters:
* @param logLevel string
Expand Down Expand Up @@ -433,7 +440,9 @@ export class StreamChat<
this.wsBaseURL = this.baseURL.replace('http', 'ws').replace(':3030', ':8800');
}

_hasConnectionID = () => Boolean(this.wsConnection?.connectionID);
_getConnectionID = () => this.wsConnection?.connectionID || this.wsFallback?.connectionID;

_hasConnectionID = () => Boolean(this._getConnectionID());

/**
* connectUser - Set the current user and open a WebSocket connection
Expand Down Expand Up @@ -534,17 +543,14 @@ export class StreamChat<
* @param timeout Max number of ms, to wait for close event of websocket, before forcefully assuming succesful disconnection.
* https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
*/
closeConnection = (timeout?: number) => {
closeConnection = async (timeout?: number) => {
if (this.cleaningIntervalRef != null) {
clearInterval(this.cleaningIntervalRef);
this.cleaningIntervalRef = undefined;
}

if (!this.wsConnection) {
return Promise.resolve();
}

return this.wsConnection.disconnect(timeout);
await Promise.all([this.wsConnection?.disconnect(timeout), this.wsFallback?.disconnect(timeout)]);
return Promise.resolve();
};

/**
Expand All @@ -555,7 +561,7 @@ export class StreamChat<
throw Error('User is not set on client, use client.connectUser or client.connectAnonymousUser instead');
}

if (this.wsConnection?.isHealthy && this._hasConnectionID()) {
if ((this.wsConnection?.isHealthy || this.wsFallback?.isHealthy()) && this._hasConnectionID()) {
this.logger('info', 'client:openConnection() - openConnection called twice, healthy connection already exists', {
tags: ['connection', 'client'],
});
Expand Down Expand Up @@ -736,7 +742,7 @@ export class StreamChat<
// reset client state
this.state = new ClientState();
// reset token manager
this.tokenManager.reset();
setTimeout(this.tokenManager.reset); // delay reseting to use token for disconnect calls
mahboubii marked this conversation as resolved.
Show resolved Hide resolved

// close the WS connection
return closePromise;
Expand Down Expand Up @@ -1022,6 +1028,7 @@ export class StreamChat<
this._logApiError(type, url, e);
this.consecutiveFailures += 1;
if (e.response) {
/** connection_fallback depends on this token expiration logic */
if (e.response.data.code === chatCodes.TOKEN_EXPIRED && !this.tokenManager.isStatic()) {
if (this.consecutiveFailures > 1) {
await sleep(retryInterval(this.consecutiveFailures));
Expand Down Expand Up @@ -1099,6 +1106,8 @@ export class StreamChat<
dispatchEvent = (
event: Event<AttachmentType, ChannelType, CommandType, EventType, MessageType, ReactionType, UserType>,
) => {
if (!event.received_at) event.received_at = new Date();

// client event handlers
const postListenerCallbacks = this._handleClientEvent(event);

Expand Down Expand Up @@ -1130,7 +1139,6 @@ export class StreamChat<
ReactionType,
UserType
>;
event.received_at = new Date();
this.dispatchEvent(event);
};

Expand Down Expand Up @@ -1369,13 +1377,9 @@ export class StreamChat<
};

recoverState = async () => {
this.logger(
'info',
`client:recoverState() - Start of recoverState with connectionID ${this.wsConnection?.connectionID}`,
{
tags: ['connection'],
},
);
this.logger('info', `client:recoverState() - Start of recoverState with connectionID ${this._getConnectionID()}`, {
tags: ['connection'],
});

const cids = Object.keys(this.activeChannels);
if (cids.length && this.recoverStateOnReconnect) {
Expand Down Expand Up @@ -1435,7 +1439,22 @@ export class StreamChat<
ReactionType
>({ client: this });

return await this.wsConnection.connect();
try {
// if WSFallback is enabled, ws connect should timeout faster so fallback can try
return await this.wsConnection.connect(
this.options.enableWSFallback ? this.defaultWSTimeoutWithFallback : this.defaultWSTimeout,
);
} catch (err) {
// run fallback only if it's WS/Network error and not a normal API error
if (this.options.enableWSFallback && isWSFailure(err)) {
this.wsConnection._destroyCurrentWSConnection();
this.wsConnection.disconnect().then(); // close WS so no retry
mahboubii marked this conversation as resolved.
Show resolved Hide resolved
this.wsFallback = new WSConnectionFallback({ client: (this as unknown) as StreamChat });
return await this.wsFallback.connect();
}

throw err;
}
}

/**
Expand Down Expand Up @@ -1671,7 +1690,7 @@ export class StreamChat<
*
*/
setLocalDevice(device: BaseDeviceFields) {
if (this.wsConnection) {
if (this.wsConnection || this.wsFallback) {
throw new Error('you can only set device before opening a websocket connection');
}

Expand Down Expand Up @@ -2538,7 +2557,7 @@ export class StreamChat<
user_id: this.userID,
...options.params,
api_key: this.key,
connection_id: this.wsConnection?.connectionID,
connection_id: this._getConnectionID(),
},
headers: {
Authorization: token,
Expand Down Expand Up @@ -2569,6 +2588,22 @@ export class StreamChat<
}, 500);
}

/**
* encode ws url payload
* @private
* @returns json string
*/
_buildWSPayload = (client_request_id?: string) => {
return JSON.stringify({
user_id: this.userID,
user_details: this._user,
user_token: this.tokenManager.getToken(),
server_determines_connection_id: true,
mahboubii marked this conversation as resolved.
Show resolved Hide resolved
device: this.options.device,
client_request_id,
});
};

verifyWebhook(requestBody: string, xSignature: string) {
return !!this.secret && CheckSignature(requestBody, this.secret, xSignature);
}
Expand Down
39 changes: 23 additions & 16 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export class StableWSConnection<
pingInterval: number;
healthCheckTimeoutRef?: NodeJS.Timeout;
isConnecting: boolean;
isDisconnected: boolean;
isHealthy: boolean;
isResolved?: boolean;
lastEvent: Date | null;
Expand Down Expand Up @@ -75,6 +76,8 @@ export class StableWSConnection<
this.totalFailures = 0;
/** We only make 1 attempt to reconnect at the same time.. */
this.isConnecting = false;
/** To avoid reconnect if client is disconnected */
this.isDisconnected = false;
/** Boolean that indicates if the connection promise is resolved */
this.isResolved = false;
/** Boolean that indicates if we have a working connection to the server */
Expand All @@ -95,14 +98,16 @@ export class StableWSConnection<

/**
* connect - Connect to the WS URL
*
* the default 15s timeout allows between 2~3 tries
* @return {ConnectAPIResponse<ChannelType, CommandType, UserType>} Promise that completes once the first health check message is received
*/
async connect() {
async connect(timeout = 15000) {
mahboubii marked this conversation as resolved.
Show resolved Hide resolved
if (this.isConnecting) {
throw Error(`You've called connect twice, can only attempt 1 connection at the time`);
}

this.isDisconnected = false;

try {
const healthCheck = await this._connect();
this.consecutiveFailures = 0;
Expand All @@ -128,7 +133,7 @@ export class StableWSConnection<
}
}

return await this._waitForHealthy();
return await this._waitForHealthy(timeout);
}

/**
Expand Down Expand Up @@ -160,6 +165,7 @@ export class StableWSConnection<
})(),
(async () => {
await sleep(timeout);
this.isConnecting = false;
throw new Error(
JSON.stringify({
code: '',
Expand All @@ -174,20 +180,13 @@ export class StableWSConnection<

/**
* Builds and returns the url for websocket.
* @param reqID Unique identifier generated on client side, to help tracking apis on backend.
* @private
* @returns url string
*/
_buildUrl = (reqID?: string) => {
const params = {
user_id: this.client.userID,
user_details: this.client._user,
user_token: this.client.tokenManager.getToken(),
server_determines_connection_id: true,
device: this.client.options.device,
client_request_id: reqID,
};
const qs = encodeURIComponent(JSON.stringify(params));
_buildUrl = () => {
const qs = encodeURIComponent(this.client._buildWSPayload(this.requestID));
const token = this.client.tokenManager.getToken();

return `${this.client.wsBaseURL}/connect?json=${qs}&api_key=${
this.client.key
}&authorization=${token}&stream-auth-type=${this.client.getAuthType()}&X-Stream-Client=${this.client.getUserAgent()}`;
Expand All @@ -201,6 +200,8 @@ export class StableWSConnection<
this._log(`disconnect() - Closing the websocket connection for wsID ${this.wsID}`);

this.wsID += 1;
this.isConnecting = false;
this.isDisconnected = true;

// start by removing all the listeners
if (this.healthCheckTimeoutRef) {
Expand Down Expand Up @@ -256,14 +257,14 @@ export class StableWSConnection<
* @return {ConnectAPIResponse<ChannelType, CommandType, UserType>} Promise that completes once the first health check message is received
*/
async _connect() {
if (this.isConnecting) return; // simply ignore _connect if it's currently trying to connect
if (this.isConnecting || this.isDisconnected) return; // simply ignore _connect if it's currently trying to connect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will affect background/foreground logic on mobile, no? Once disconnected, user won't be able to connect again when app comes back to foreground

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs maintenance of it in callback to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you call to connect again when the app comes to foreground?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you call disconnect(), and then call connectUser() again, it should work since connect will set isDisconnected to false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So on RN we use openConnection/closeConnection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then it should be OK, but still worth manual testing with both longpoll enabled/disabled situation

this.isConnecting = true;
this.requestID = randomId();
this.client.insightMetrics.connectionStartTimestamp = new Date().getTime();
try {
await this.client.tokenManager.tokenReady();
this._setupConnectionPromise();
const wsURL = this._buildUrl(this.requestID);
const wsURL = this._buildUrl();
this.ws = new WebSocket(wsURL);
this.ws.onopen = this.onopen.bind(this, this.wsID);
this.ws.onclose = this.onclose.bind(this, this.wsID);
Expand Down Expand Up @@ -307,6 +308,12 @@ export class StableWSConnection<
*/
async _reconnect(options: { interval?: number; refreshToken?: boolean } = {}): Promise<void> {
this._log('_reconnect() - Initiating the reconnect');

if (this.isDisconnected) {
this._log('_reconnect() - Abort (0) since disconnect() is called');
return;
}

// only allow 1 connection at the time
if (this.isConnecting || this.isHealthy) {
this._log('_reconnect() - Abort (1) since already connecting or healthy');
Expand Down
Loading