Skip to content

Commit

Permalink
fix: set local device's check for alive websocket connection (#1067)
Browse files Browse the repository at this point in the history
  • Loading branch information
santhoshvai authored Nov 10, 2022
1 parent 47d3d96 commit a47b55b
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 38 deletions.
130 changes: 93 additions & 37 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,10 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
if (apn_config?.p12_cert) {
options = {
...options,
apn_config: { ...apn_config, p12_cert: Buffer.from(apn_config.p12_cert).toString('base64') },
apn_config: {
...apn_config,
p12_cert: Buffer.from(apn_config.p12_cert).toString('base64'),
},
};
}
return await this.patch<APIResponse>(this.baseURL + '/app', options);
Expand All @@ -620,7 +623,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
* Revokes all tokens on application level issued before given time
*/
async revokeTokens(before: Date | string | null) {
return await this.updateAppSettings({ revoke_tokens_issued_before: this._normalizeDate(before) });
return await this.updateAppSettings({
revoke_tokens_issued_before: this._normalizeDate(before),
});
}

/**
Expand All @@ -644,7 +649,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
for (const userID of userIDs) {
users.push({
id: userID,
set: <Partial<UserResponse<StreamChatGenerics>>>{ revoke_tokens_issued_before: before },
set: <Partial<UserResponse<StreamChatGenerics>>>{
revoke_tokens_issued_before: before,
},
});
}

Expand Down Expand Up @@ -754,7 +761,10 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG

this.anonymous = true;
this.userID = randomId();
const anonymousUser = { id: this.userID, anon: true } as UserResponse<StreamChatGenerics>;
const anonymousUser = {
id: this.userID,
anon: true,
} as UserResponse<StreamChatGenerics>;

this._setToken(anonymousUser, '');
this._setUser(anonymousUser);
Expand All @@ -778,10 +788,12 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
let response: { access_token: string; user: UserResponse<StreamChatGenerics> } | undefined;
this.anonymous = true;
try {
response = await this.post<APIResponse & { access_token: string; user: UserResponse<StreamChatGenerics> }>(
this.baseURL + '/guest',
{ user },
);
response = await this.post<
APIResponse & {
access_token: string;
user: UserResponse<StreamChatGenerics>;
}
>(this.baseURL + '/guest', { user });
} catch (e) {
this.anonymous = false;
throw e;
Expand Down Expand Up @@ -841,11 +853,15 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
if (!(key in this.listeners)) {
this.listeners[key] = [];
}
this.logger('info', `Attaching listener for ${key} event`, { tags: ['event', 'client'] });
this.logger('info', `Attaching listener for ${key} event`, {
tags: ['event', 'client'],
});
this.listeners[key].push(callback);
return {
unsubscribe: () => {
this.logger('info', `Removing listener for ${key} event`, { tags: ['event', 'client'] });
this.logger('info', `Removing listener for ${key} event`, {
tags: ['event', 'client'],
});
this.listeners[key] = this.listeners[key].filter((el) => el !== callback);
},
};
Expand All @@ -867,7 +883,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
this.listeners[key] = [];
}

this.logger('info', `Removing listener for ${key} event`, { tags: ['event', 'client'] });
this.logger('info', `Removing listener for ${key} event`, {
tags: ['event', 'client'],
});
this.listeners[key] = this.listeners[key].filter((value) => value !== callback);
}

Expand Down Expand Up @@ -907,7 +925,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
type: string,
url: string,
data?: unknown,
options: AxiosRequestConfig & { config?: AxiosRequestConfig & { maxBodyLength?: number } } = {},
options: AxiosRequestConfig & {
config?: AxiosRequestConfig & { maxBodyLength?: number };
} = {},
): Promise<T> => {
await this.tokenManager.tokenReady();
const requestConfig = this._enrichAxiosOptions(options);
Expand Down Expand Up @@ -1298,9 +1318,13 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
);

this.logger('info', 'client:recoverState() - Querying channels finished', { tags: ['connection', 'client'] });
this.dispatchEvent({ type: 'connection.recovered' } as Event<StreamChatGenerics>);
this.dispatchEvent({
type: 'connection.recovered',
} as Event<StreamChatGenerics>);
} else {
this.dispatchEvent({ type: 'connection.recovered' } as Event<StreamChatGenerics>);
this.dispatchEvent({
type: 'connection.recovered',
} as Event<StreamChatGenerics>);
}

this.wsPromise = Promise.resolve();
Expand Down Expand Up @@ -1330,7 +1354,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
((this.options.wsConnection as unknown) as StableWSConnection<StreamChatGenerics>).setClient(this);
this.wsConnection = (this.options.wsConnection as unknown) as StableWSConnection<StreamChatGenerics>;
} else {
this.wsConnection = new StableWSConnection<StreamChatGenerics>({ client: this });
this.wsConnection = new StableWSConnection<StreamChatGenerics>({
client: this,
});
}

try {
Expand All @@ -1352,7 +1378,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG

this.wsConnection._destroyCurrentWSConnection();
this.wsConnection.disconnect().then(); // close WS so no retry
this.wsFallback = new WSConnectionFallback<StreamChatGenerics>({ client: this });
this.wsFallback = new WSConnectionFallback<StreamChatGenerics>({
client: this,
});
return await this.wsFallback.connect();
}

Expand All @@ -1370,7 +1398,11 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
const opts = { headers: { 'x-client-request-id': client_request_id } };
this.doAxiosRequest('get', this.baseURL + '/hi', null, opts).catch((e) => {
if (this.options.enableInsights) {
postInsights('http_hi_failed', { api_key: this.key, err: e, client_request_id });
postInsights('http_hi_failed', {
api_key: this.key,
err: e,
client_request_id,
});
}
});
}
Expand Down Expand Up @@ -1476,7 +1508,11 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
options: ChannelOptions = {},
stateOptions: ChannelStateOptions = {},
) {
const defaultOptions: ChannelOptions = { state: true, watch: true, presence: false };
const defaultOptions: ChannelOptions = {
state: true,
watch: true,
presence: false,
};

// Make sure we wait for the connect promise if there is a pending one
await this.wsPromise;
Expand Down Expand Up @@ -1592,7 +1628,10 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
*
*/
setLocalDevice(device: BaseDeviceFields) {
if (this.wsConnection || this.wsFallback) {
if (
(this.wsConnection?.isConnecting && this.wsPromise) ||
((this.wsConnection?.isHealthy || this.wsFallback?.isHealthy()) && this._hasConnectionID())
) {
throw new Error('you can only set device before opening a websocket connection');
}

Expand Down Expand Up @@ -1843,10 +1882,11 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
userMap[userObject.id] = userObject;
}

return await this.post<APIResponse & { users: { [key: string]: UserResponse<StreamChatGenerics> } }>(
this.baseURL + '/users',
{ users: userMap },
);
return await this.post<
APIResponse & {
users: { [key: string]: UserResponse<StreamChatGenerics> };
}
>(this.baseURL + '/users', { users: userMap });
}

/**
Expand Down Expand Up @@ -1894,10 +1934,11 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
}
}

return await this.patch<APIResponse & { users: { [key: string]: UserResponse<StreamChatGenerics> } }>(
this.baseURL + '/users',
{ users },
);
return await this.patch<
APIResponse & {
users: { [key: string]: UserResponse<StreamChatGenerics> };
}
>(this.baseURL + '/users', { users });
}

async deleteUser(
Expand All @@ -1908,10 +1949,11 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
mark_messages_deleted?: boolean;
},
) {
return await this.delete<APIResponse & { user: UserResponse<StreamChatGenerics> } & { task_id?: string }>(
this.baseURL + `/users/${userID}`,
params,
);
return await this.delete<
APIResponse & { user: UserResponse<StreamChatGenerics> } & {
task_id?: string;
}
>(this.baseURL + `/users/${userID}`, params);
}

/**
Expand All @@ -1922,12 +1964,18 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
* @return {APIResponse} A task ID
*/
async restoreUsers(user_ids: string[]) {
return await this.post<APIResponse>(this.baseURL + `/users/restore`, { user_ids });
return await this.post<APIResponse>(this.baseURL + `/users/restore`, {
user_ids,
});
}

async reactivateUser(
userID: string,
options?: { created_by_id?: string; name?: string; restore_messages?: boolean },
options?: {
created_by_id?: string;
name?: string;
restore_messages?: boolean;
},
) {
return await this.post<APIResponse & { user: UserResponse<StreamChatGenerics> }>(
this.baseURL + `/users/${userID}/reactivate`,
Expand Down Expand Up @@ -2349,7 +2397,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
);
return this.partialUpdateMessage(
messageId,
({ set: { pinned: false } } as unknown) as PartialMessageUpdate<StreamChatGenerics>,
({
set: { pinned: false },
} as unknown) as PartialMessageUpdate<StreamChatGenerics>,
userId,
);
}
Expand Down Expand Up @@ -2400,7 +2450,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
if (isString(userId)) {
clonedMessage.user_id = userId;
} else {
clonedMessage.user = { id: userId.id } as UserResponse<StreamChatGenerics>;
clonedMessage.user = {
id: userId.id,
} as UserResponse<StreamChatGenerics>;
}
}

Expand Down Expand Up @@ -2573,7 +2625,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
* @returns {Promise<APIResponse>}
*/
createPermission(permissionData: CustomPermissionOptions) {
return this.post<APIResponse>(`${this.baseURL}/permissions`, { ...permissionData });
return this.post<APIResponse>(`${this.baseURL}/permissions`, {
...permissionData,
});
}

/** updatePermission - updates an existing custom permission
Expand All @@ -2583,7 +2637,9 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
* @returns {Promise<APIResponse>}
*/
updatePermission(id: string, permissionData: Omit<CustomPermissionOptions, 'id'>) {
return this.put<APIResponse>(`${this.baseURL}/permissions/${id}`, { ...permissionData });
return this.put<APIResponse>(`${this.baseURL}/permissions/${id}`, {
...permissionData,
});
}

/** deletePermission - deletes a custom permission
Expand Down
4 changes: 3 additions & 1 deletion test/unit/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,9 @@ describe('Client setLocalDevice', async () => {
});

it('should throw error when updating device with ws open', async () => {
client.wsConnection = true;
client.wsConnection = new StableWSConnection({});
client.wsConnection.isHealthy = true;
client.wsConnection.connectionID = 'ID';

expect(() => client.setLocalDevice({ id: 'id3', push_provider: 'firebase' })).to.throw();
});
Expand Down

0 comments on commit a47b55b

Please sign in to comment.