diff --git a/src/kibana/components/courier/courier.js b/src/kibana/components/courier/courier.js index 55ee3ed3a9feaf..5645fd586c202f 100644 --- a/src/kibana/components/courier/courier.js +++ b/src/kibana/components/courier/courier.js @@ -35,6 +35,13 @@ define(function (require) { */ courier._pendingRequests = []; + /** + * Queue of pending error handlers, they are removed as + * they are resolved. + * @type {Array} + */ + courier._errorHandlers = []; + /** * Fetch the docs * @type {function} diff --git a/src/kibana/components/courier/data_source/abstract.js b/src/kibana/components/courier/data_source/abstract.js index a3197f6187cd1c..7b6f17187a2e26 100644 --- a/src/kibana/components/courier/data_source/abstract.js +++ b/src/kibana/components/courier/data_source/abstract.js @@ -127,7 +127,7 @@ define(function (require) { /** * Put a request in to the courier that this Source should * be fetched on the next run of the courier - * @return {[type]} [description] + * @return {Promise} */ SourceAbstract.prototype.onResults = function () { var defer = Promise.defer(); @@ -138,6 +138,21 @@ define(function (require) { return defer.promise; }; + /** + * similar to onResults, but allows a seperate loopy code path + * for error handling. + * + * @return {Promise} + */ + SourceAbstract.prototype.onError = function () { + var defer = Promise.defer(); + this._courier._errorHandlers.push({ + source: this, + defer: defer + }); + return defer.promise; + }; + /** * Fetch just this source ASAP * @param {Function} cb - callback diff --git a/src/kibana/components/courier/fetch/fetch.js b/src/kibana/components/courier/fetch/fetch.js index b83639b56cdb0b..db67a521dd72d1 100644 --- a/src/kibana/components/courier/fetch/fetch.js +++ b/src/kibana/components/courier/fetch/fetch.js @@ -6,13 +6,30 @@ define(function (require) { var docStrategy = require('./strategy/doc'); var searchStrategy = require('./strategy/search'); - module.service('couriersFetch', function (es, Promise, couriersErrors) { + module.service('couriersFetch', function (es, Promise, couriersErrors, createNotifier) { + var notify = createNotifier({ + location: 'Courier Fetch' + }); var flattenRequest = function (req) { return req.source._flatten(); }; - var fetchThese = function (strategy, requests) { + function RequestErrorHandler(courier) { this._courier = courier; } + RequestErrorHandler.prototype.handle = function (req, error) { + if (!this._courier) return req.defer.reject(error); + this._courier._pendingRequests.push(req); + var handlerCount = 0; + this._courier._errorHandlers.splice(0).forEach(function (handler) { + if (handler.source !== req.source) return this._courier._pendingRequests.push(handler); + handler.defer.resolve(error); + handlerCount++; + }); + if (!handlerCount) notify.fatal(new Error('unhandled error ' + (error.stack || error.message))); + }; + + var fetchThese = function (strategy, requests, reqErrHandler) { + var all = requests.splice(0); return es[strategy.clientMethod]({ body: strategy.requestStatesToBody(all.map(flattenRequest)) @@ -20,7 +37,7 @@ define(function (require) { .then(function (resp) { strategy.getResponses(resp).forEach(function (resp) { var req = all.shift(); - if (resp.error) return req.defer.reject(new couriersErrors.FetchFailure(resp)); + if (resp.error) return reqErrHandler.handle(req, new couriersErrors.FetchFailure(resp)); else strategy.resolveRequest(req, resp); }); @@ -29,7 +46,7 @@ define(function (require) { }) .catch(function (err) { all.forEach(function (req) { - req.defer.reject(err); + reqErrHandler.handle(req, err); }); throw err; }); @@ -38,7 +55,7 @@ define(function (require) { var fetchPending = function (strategy, courier) { var requests = strategy.getPendingRequests(courier._pendingRequests); if (!requests.length) return Promise.resolved(); - else return fetchThese(strategy, requests); + else return fetchThese(strategy, requests, new RequestErrorHandler(courier)); }; var fetchASource = function (strategy, source) { @@ -48,7 +65,7 @@ define(function (require) { source: source, defer: defer } - ]); + ], new RequestErrorHandler()); return defer.promise; };