From 6db9def22eb020da9f7346504e4c70acaec3d7d3 Mon Sep 17 00:00:00 2001 From: "Michael Bradley, Jr" Date: Wed, 5 Dec 2018 19:26:01 -0600 Subject: [PATCH] refactor(@embark/utils): use timeouts to improve utils.pingEndpoint For reasons unknown, sockets underlying http and websocket requests made by `utils.pingEndpoint` could become stuck and after a *long* time eventually hang up. This can happen even when the endpoint being pinged is perfectly responsive, e.g. when checking it with `curl` in another terminal. The root cause may be a bug or poorly documented behavior in node.js itself. The effects of the stuck sockets are experienced more often when the CPU is under moderate-heavy load, and also more often when pinging embark's blockchain proxy vs. pinging geth, ipfs, etc. directly. Use timeouts to cut short the stuck sockets. Usually, after one-to-several retries (which may get stuck) following a stuck socket, connections behave as expected (learned by experience). Assume that `T` time of inactivity on a "ping socket" indicates that socket is stuck; increase `T` exponentially on each retry until it reaches a max of 3000ms, and don't continue retrying beyond 600000ms (10 minutes). Make the initial `T` value 100ms. Once a retry is underway following a timeout event, ignore all further events on the previous ping attempt. Note that when the CPU is moderately busy, one of the slowest steps is launching embark's child processes, e.g. ipfs, geth, etc. However, even under very heavy load, with the revised `pingEndpoint` embark seems to always eventually start/detect services correctly. And under light to moderate load, the effect of the stuck sockets is much less noticeable. Use a weboscket client from the `ws` package instead of manually building websocket headers. Use `pingEndpoint` for the IPFS module's `_checkService` method to avoid the same stuck socket problem that was affecting blockchain detection. --- src/lib/modules/blockchain_process/proxy.js | 2 +- src/lib/modules/ipfs/index.js | 36 ++++-- src/lib/utils/utils.js | 122 ++++++++++++++------ 3 files changed, 116 insertions(+), 44 deletions(-) diff --git a/src/lib/modules/blockchain_process/proxy.js b/src/lib/modules/blockchain_process/proxy.js index aa6e4ae23e..6ecc8bcdb2 100644 --- a/src/lib/modules/blockchain_process/proxy.js +++ b/src/lib/modules/blockchain_process/proxy.js @@ -112,7 +112,7 @@ exports.serve = async (ipc, host, port, ws, origin) => { 'http', origin ? origin.split(',')[0] : undefined, (err) => { - if (!err || (Date.now() - start > 10000)) { + if (!err || (Date.now() - start > 600000)) { resolve(); } else { utils.timer(250).then(waitOnTarget).then(resolve); diff --git a/src/lib/modules/ipfs/index.js b/src/lib/modules/ipfs/index.js index 8e483c317f..bd2abeafa9 100644 --- a/src/lib/modules/ipfs/index.js +++ b/src/lib/modules/ipfs/index.js @@ -4,6 +4,7 @@ const fs = require('../../core/fs.js'); const IpfsApi = require('ipfs-api'); // TODO: not great, breaks module isolation const StorageProcessesLauncher = require('../storage/storageProcessesLauncher'); +const {canonicalHost} = require('../../utils/host'); class IPFS { @@ -67,7 +68,7 @@ class IPFS { }); self.events.request("services:register", 'IPFS', function (cb) { - self._checkService((err, body) => { + self._checkService(true, (err, body) => { if (err) { self.logger.trace("IPFS unavailable"); return cb({name: "IPFS ", status: 'off'}); @@ -82,21 +83,42 @@ class IPFS { }); } - _getNodeUrl() { + _getNodeConfig() { if (this.storageConfig.upload.provider === 'ipfs') { - return utils.buildUrlFromConfig(this.storageConfig.upload) + '/api/v0/version'; + return this.storageConfig.upload; } for (let connection of this.storageConfig.dappConnection) { if (connection.provider === 'ipfs') { - return utils.buildUrlFromConfig(connection) + '/api/v0/version'; + return connection; } } } - _checkService(cb) { - let url = this._getNodeUrl(); - utils.getJson(url, cb); + _checkService(getJson, cb) { + let _cb = cb || function () {}; + let _getJson = getJson; + if (typeof getJson === 'function') { + _cb = getJson; + _getJson = false; + } + const cfg = this._getNodeConfig(); + utils.pingEndpoint( + canonicalHost(cfg.host), + cfg.port, + false, + cfg.protocol === 'https' ? cfg.protocol : 'http', + utils.buildUrlFromConfig(cfg), + (err) => { + if (err) { + _cb(err); + } else if (_getJson) { + utils.getJson(utils.buildUrlFromConfig(cfg) + '/api/v0/version', _cb); + } else { + _cb(); + } + } + ); } addStorageProviderToEmbarkJS() { diff --git a/src/lib/utils/utils.js b/src/lib/utils/utils.js index b21317f81d..ddcb341dac 100644 --- a/src/lib/utils/utils.js +++ b/src/lib/utils/utils.js @@ -89,45 +89,95 @@ function getJson(url, cb) { httpGetJson(url, cb); } -function pingEndpoint(host, port, type, protocol, origin, callback) { - const options = { - protocolVersion: 13, - perMessageDeflate: true, - origin: origin, - host: host, - port: port +function pingEndpoint(host, port, type, protocol, origin, callback, count = 0, start = Date.now()) { + // remove trailing api key from infura, e.g. rinkeby.infura.io/nmY8WtT4QfEwz2S7wTbl + const _host = host.indexOf('/') > -1 ? host.split('/')[0] : host; + const maxWait = 600000; + const timeout = 100; + + const adjustTimeout = () => { + const t = Math.floor( + (1 + (0.1 * (count && Math.pow(2, count - 1)))) * timeout + ); + return t > 3000 ? 3000 : t; }; - if (type === 'ws') { - options.headers = { - 'Sec-WebSocket-Version': 13, - Connection: 'Upgrade', - Upgrade: 'websocket', - 'Sec-WebSocket-Extensions': 'permessage-deflate; client_max_window_bits', - Origin: origin - }; - } - let req; - // remove trailing api key from infura, ie rinkeby.infura.io/nmY8WtT4QfEwz2S7wTbl - if (options.host.indexOf('/') > -1) { - options.host = options.host.split('/')[0]; - } - if (protocol === 'https') { - req = require('https').get(options); - } else { - req = require('http').get(options); - } - req.on('error', (err) => { - callback(err); - }); + let alreadyClosed = false; + let retrying = false; - req.on('response', (_response) => { - callback(); - }); + const shouldCallback = (...args) => { + if (args.length) { + callback._shouldCallback = !!args[0]; + } + let shouldCallback; + if (callback.hasOwnProperty('_shouldCallback')) { + shouldCallback = !!callback._shouldCallback; + } else { + shouldCallback = true; + } + return shouldCallback; + }; - req.on('upgrade', (_res, _socket, _head) => { - callback(); - }); + const cleanup = (req, closeMethod) => { + if (!alreadyClosed) { + alreadyClosed = true; + setImmediate(() => { req[closeMethod](); }); + } + }; + + const handleEvent = (req, closeMethod, retryCond, ...args) => { + if (shouldCallback()) { + if (!retrying && retryCond) { + retrying = true; + setImmediate(() => { + pingEndpoint( + host, port, type, protocol, origin, callback, ++count, start + ); + }); + } else if (!alreadyClosed) { + shouldCallback(false); + callback(...args); + } + } + // following cleanup any later events on req will effectively be ignored + cleanup(req, closeMethod); + }; + + const handleError = (req, closeMethod) => { + req.on('error', (err) => { + handleEvent( + req, + closeMethod, + (/timed out/).test(err.message) && Date.now() - start < maxWait, + err + ); + }); + }; + + const handleSuccess = (req, closeMethod, event) => { + req.once(event, () => { handleEvent(req, closeMethod, false); }); + }; + + const handleRequest = (req, closeMethod, event) => { + handleError(req, closeMethod); + handleSuccess(req, closeMethod, event); + }; + + if (type === 'ws') { + const req = new (require('ws'))( + `${protocol === 'https' ? 'wss' : 'ws'}://${_host}:${port}/`, + {handshakeTimeout: adjustTimeout(), origin} + ); + handleRequest(req, 'close', 'open'); + } else { + const req = (protocol === 'https' ? require('https') : require('http')).get( + {host: _host, origin, port} + ); + handleRequest(req, 'abort', 'response'); + req.setTimeout(adjustTimeout(), () => { + req.emit('error', new Error('timed out')); + }); + } } function runCmd(cmd, options, callback) { @@ -554,7 +604,7 @@ function isNotFolder(node){ } function byName(a, b) { - return a.name.localeCompare(b.name); + return a.name.localeCompare(b.name); } function fileTreeSort(nodes){