Skip to content

Commit

Permalink
refactor(@embark/utils): use timeouts to improve utils.pingEndpoint
Browse files Browse the repository at this point in the history
For reasons unknown, sockets underlying http and websocket requests made by
`utils.pingEndpoint` could become stuck and after a *long* time eventually hang
up. This can happen even when the endpoint being pinged is perfectly
responsive, e.g. when checking it with `curl` in another terminal. The root
cause may be a bug or poorly documented behavior in node.js itself. The effects
of the stuck sockets are experienced more often when the CPU is under
moderate-heavy load, and also more often when pinging embark's blockchain proxy
vs. pinging geth, ipfs, etc. directly.

Use timeouts to cut short the stuck sockets. Usually, after one-to-several
retries (which may get stuck) following a stuck socket, connections behave as
expected (learned by experience). Assume that `T` time of inactivity on a "ping
socket" indicates that socket is stuck; increase `T` exponentially on each
retry until it reaches a max of 3000ms, and don't continue retrying beyond
600000ms (10 minutes). Make the initial `T` value 100ms. Once a retry is
underway following a timeout event, ignore all further events on the previous
ping attempt.

Note that when the CPU is moderately busy, one of the slowest steps is
launching embark's child processes, e.g. ipfs, geth, etc. However, even under
very heavy load, with the revised `pingEndpoint` embark seems to always
eventually start/detect services correctly. And under light to moderate load,
the effect of the stuck sockets is much less noticeable.

Use a weboscket client from the `ws` package instead of manually building
websocket headers.

Use `pingEndpoint` for the IPFS module's `_checkService` method to avoid the
same stuck socket problem that was affecting blockchain detection.
  • Loading branch information
michaelsbradleyjr committed Dec 12, 2018
1 parent 427d2c6 commit 6db9def
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/lib/modules/blockchain_process/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ exports.serve = async (ipc, host, port, ws, origin) => {
'http',
origin ? origin.split(',')[0] : undefined,
(err) => {
if (!err || (Date.now() - start > 10000)) {
if (!err || (Date.now() - start > 600000)) {
resolve();
} else {
utils.timer(250).then(waitOnTarget).then(resolve);
Expand Down
36 changes: 29 additions & 7 deletions src/lib/modules/ipfs/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const fs = require('../../core/fs.js');
const IpfsApi = require('ipfs-api');
// TODO: not great, breaks module isolation
const StorageProcessesLauncher = require('../storage/storageProcessesLauncher');
const {canonicalHost} = require('../../utils/host');

class IPFS {

Expand Down Expand Up @@ -67,7 +68,7 @@ class IPFS {
});

self.events.request("services:register", 'IPFS', function (cb) {
self._checkService((err, body) => {
self._checkService(true, (err, body) => {
if (err) {
self.logger.trace("IPFS unavailable");
return cb({name: "IPFS ", status: 'off'});
Expand All @@ -82,21 +83,42 @@ class IPFS {
});
}

_getNodeUrl() {
_getNodeConfig() {
if (this.storageConfig.upload.provider === 'ipfs') {
return utils.buildUrlFromConfig(this.storageConfig.upload) + '/api/v0/version';
return this.storageConfig.upload;
}

for (let connection of this.storageConfig.dappConnection) {
if (connection.provider === 'ipfs') {
return utils.buildUrlFromConfig(connection) + '/api/v0/version';
return connection;
}
}
}

_checkService(cb) {
let url = this._getNodeUrl();
utils.getJson(url, cb);
_checkService(getJson, cb) {
let _cb = cb || function () {};
let _getJson = getJson;
if (typeof getJson === 'function') {
_cb = getJson;
_getJson = false;
}
const cfg = this._getNodeConfig();
utils.pingEndpoint(
canonicalHost(cfg.host),
cfg.port,
false,
cfg.protocol === 'https' ? cfg.protocol : 'http',
utils.buildUrlFromConfig(cfg),
(err) => {
if (err) {
_cb(err);
} else if (_getJson) {
utils.getJson(utils.buildUrlFromConfig(cfg) + '/api/v0/version', _cb);
} else {
_cb();
}
}
);
}

addStorageProviderToEmbarkJS() {
Expand Down
122 changes: 86 additions & 36 deletions src/lib/utils/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,45 +89,95 @@ function getJson(url, cb) {
httpGetJson(url, cb);
}

function pingEndpoint(host, port, type, protocol, origin, callback) {
const options = {
protocolVersion: 13,
perMessageDeflate: true,
origin: origin,
host: host,
port: port
function pingEndpoint(host, port, type, protocol, origin, callback, count = 0, start = Date.now()) {
// remove trailing api key from infura, e.g. rinkeby.infura.io/nmY8WtT4QfEwz2S7wTbl
const _host = host.indexOf('/') > -1 ? host.split('/')[0] : host;
const maxWait = 600000;
const timeout = 100;

const adjustTimeout = () => {
const t = Math.floor(
(1 + (0.1 * (count && Math.pow(2, count - 1)))) * timeout
);
return t > 3000 ? 3000 : t;
};
if (type === 'ws') {
options.headers = {
'Sec-WebSocket-Version': 13,
Connection: 'Upgrade',
Upgrade: 'websocket',
'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits',
Origin: origin
};
}
let req;
// remove trailing api key from infura, ie rinkeby.infura.io/nmY8WtT4QfEwz2S7wTbl
if (options.host.indexOf('/') > -1) {
options.host = options.host.split('/')[0];
}
if (protocol === 'https') {
req = require('https').get(options);
} else {
req = require('http').get(options);
}

req.on('error', (err) => {
callback(err);
});
let alreadyClosed = false;
let retrying = false;

req.on('response', (_response) => {
callback();
});
const shouldCallback = (...args) => {
if (args.length) {
callback._shouldCallback = !!args[0];
}
let shouldCallback;
if (callback.hasOwnProperty('_shouldCallback')) {
shouldCallback = !!callback._shouldCallback;
} else {
shouldCallback = true;
}
return shouldCallback;
};

req.on('upgrade', (_res, _socket, _head) => {
callback();
});
const cleanup = (req, closeMethod) => {
if (!alreadyClosed) {
alreadyClosed = true;
setImmediate(() => { req[closeMethod](); });
}
};

const handleEvent = (req, closeMethod, retryCond, ...args) => {
if (shouldCallback()) {
if (!retrying && retryCond) {
retrying = true;
setImmediate(() => {
pingEndpoint(
host, port, type, protocol, origin, callback, ++count, start
);
});
} else if (!alreadyClosed) {
shouldCallback(false);
callback(...args);
}
}
// following cleanup any later events on req will effectively be ignored
cleanup(req, closeMethod);
};

const handleError = (req, closeMethod) => {
req.on('error', (err) => {
handleEvent(
req,
closeMethod,
(/timed out/).test(err.message) && Date.now() - start < maxWait,
err
);
});
};

const handleSuccess = (req, closeMethod, event) => {
req.once(event, () => { handleEvent(req, closeMethod, false); });
};

const handleRequest = (req, closeMethod, event) => {
handleError(req, closeMethod);
handleSuccess(req, closeMethod, event);
};

if (type === 'ws') {
const req = new (require('ws'))(
`${protocol === 'https' ? 'wss' : 'ws'}://${_host}:${port}/`,
{handshakeTimeout: adjustTimeout(), origin}
);
handleRequest(req, 'close', 'open');
} else {
const req = (protocol === 'https' ? require('https') : require('http')).get(
{host: _host, origin, port}
);
handleRequest(req, 'abort', 'response');
req.setTimeout(adjustTimeout(), () => {
req.emit('error', new Error('timed out'));
});
}
}

function runCmd(cmd, options, callback) {
Expand Down Expand Up @@ -554,7 +604,7 @@ function isNotFolder(node){
}

function byName(a, b) {
return a.name.localeCompare(b.name);
return a.name.localeCompare(b.name);
}

function fileTreeSort(nodes){
Expand Down

0 comments on commit 6db9def

Please sign in to comment.