Skip to content

Commit

Permalink
Added onError() method to DataSource object, which behaves much like …
Browse files Browse the repository at this point in the history
…onResults/onUpdate for catching updates from auto searches. Error handlers should be re-registered after the promise is resolved. Closes #56
  • Loading branch information
Spencer Alger committed Apr 16, 2014
1 parent 655d9fd commit bda9d7b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
7 changes: 7 additions & 0 deletions src/kibana/components/courier/courier.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ define(function (require) {
*/
courier._pendingRequests = [];

/**
* Queue of pending error handlers, they are removed as
* they are resolved.
* @type {Array}
*/
courier._errorHandlers = [];

/**
* Fetch the docs
* @type {function}
Expand Down
17 changes: 16 additions & 1 deletion src/kibana/components/courier/data_source/abstract.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ define(function (require) {
/**
* Put a request in to the courier that this Source should
* be fetched on the next run of the courier
* @return {[type]} [description]
* @return {Promise}
*/
SourceAbstract.prototype.onResults = function () {
var defer = Promise.defer();
Expand All @@ -138,6 +138,21 @@ define(function (require) {
return defer.promise;
};

/**
* similar to onResults, but allows a seperate loopy code path
* for error handling.
*
* @return {Promise}
*/
SourceAbstract.prototype.onError = function () {
var defer = Promise.defer();
this._courier._errorHandlers.push({
source: this,
defer: defer
});
return defer.promise;
};

/**
* Fetch just this source ASAP
* @param {Function} cb - callback
Expand Down
29 changes: 23 additions & 6 deletions src/kibana/components/courier/fetch/fetch.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,38 @@ define(function (require) {
var docStrategy = require('./strategy/doc');
var searchStrategy = require('./strategy/search');

module.service('couriersFetch', function (es, Promise, couriersErrors) {
module.service('couriersFetch', function (es, Promise, couriersErrors, createNotifier) {
var notify = createNotifier({
location: 'Courier Fetch'
});

var flattenRequest = function (req) {
return req.source._flatten();
};

var fetchThese = function (strategy, requests) {
function RequestErrorHandler(courier) { this._courier = courier; }
RequestErrorHandler.prototype.handle = function (req, error) {
if (!this._courier) return req.defer.reject(error);
this._courier._pendingRequests.push(req);
var handlerCount = 0;
this._courier._errorHandlers.splice(0).forEach(function (handler) {
if (handler.source !== req.source) return this._courier._pendingRequests.push(handler);
handler.defer.resolve(error);
handlerCount++;
});
if (!handlerCount) notify.fatal(new Error('unhandled error ' + (error.stack || error.message)));
};

var fetchThese = function (strategy, requests, reqErrHandler) {

var all = requests.splice(0);
return es[strategy.clientMethod]({
body: strategy.requestStatesToBody(all.map(flattenRequest))
})
.then(function (resp) {
strategy.getResponses(resp).forEach(function (resp) {
var req = all.shift();
if (resp.error) return req.defer.reject(new couriersErrors.FetchFailure(resp));
if (resp.error) return reqErrHandler.handle(req, new couriersErrors.FetchFailure(resp));
else strategy.resolveRequest(req, resp);
});

Expand All @@ -29,7 +46,7 @@ define(function (require) {
})
.catch(function (err) {
all.forEach(function (req) {
req.defer.reject(err);
reqErrHandler.handle(req, err);
});
throw err;
});
Expand All @@ -38,7 +55,7 @@ define(function (require) {
var fetchPending = function (strategy, courier) {
var requests = strategy.getPendingRequests(courier._pendingRequests);
if (!requests.length) return Promise.resolved();
else return fetchThese(strategy, requests);
else return fetchThese(strategy, requests, new RequestErrorHandler(courier));
};

var fetchASource = function (strategy, source) {
Expand All @@ -48,7 +65,7 @@ define(function (require) {
source: source,
defer: defer
}
]);
], new RequestErrorHandler());
return defer.promise;
};

Expand Down

0 comments on commit bda9d7b

Please sign in to comment.