From 8ebda1d7557ad315f582fb2a222277aa9c925872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kat=20March=C3=A1n?= Date: Sat, 1 Apr 2017 00:03:39 -0700 Subject: [PATCH] fix(caching): a bunch of cache-related fixes --- cache.js | 181 +++++++++++++++++++++++++++++++++++-------------------- index.js | 108 +++++++++++++++++++++++++-------- 2 files changed, 196 insertions(+), 93 deletions(-) diff --git a/cache.js b/cache.js index 4368c4d..95fa91e 100644 --- a/cache.js +++ b/cache.js @@ -38,9 +38,20 @@ module.exports = class Cache { // Returns a Promise that resolves to the response associated with the first // matching request in the Cache object. match (request, opts) { - request = new fetch.Request(request) - return cacache.get.info(this._path, cacheKey(request)).then(info => { - if (info && matchDetails(request, info.metadata, opts)) { + const req = new fetch.Request(request) + return cacache.get.info(this._path, cacheKey(req)).then(info => { + if (info && matchDetails(req, { + url: info.metadata.url, + reqHeaders: new fetch.Headers(info.metadata.reqHeaders), + resHeaders: new fetch.Headers(info.metadata.resHeaders) + })) { + if (req.method === 'HEAD') { + return new fetch.Response(null, { + url: req.url, + headers: info.metadata.resHeaders, + status: 200 + }) + } return new this.Promise((resolve, reject) => { fs.stat(info.path, (err, stat) => { if (err) { @@ -50,26 +61,37 @@ module.exports = class Cache { } }) }).then(stat => { - let body - if (stat.size > MAX_MEM_SIZE) { - body = cacache.get.stream.byDigest(this._path, info.digest, { - hashAlgorithm: info.hashAlgorithm - }) - } else { - // cacache is much faster at bulk reads - body = through() - cacache.get.byDigest(this._path, info.digest, { - hashAlgorithm: info.hashAlgorithm, - memoize: true - }).then(data => { - body.write(data, () => { - body.end() - }) - }, err => body.emit('error', err)) - } + const cachePath = this._path + let disturbed = false + // avoid opening cache file handles until a user actually tries to + // read from it. + const body = through((chunk, enc, cb) => { + if (disturbed) { + cb(null, chunk, enc) + } else { + disturbed = true + if (stat.size > MAX_MEM_SIZE) { + pipe(cacache.get.stream.byDigest(cachePath, info.digest, { + hashAlgorithm: info.hashAlgorithm + }), body, () => {}) + } else { + // cacache is much faster at bulk reads + cacache.get.byDigest(cachePath, info.digest, { + hashAlgorithm: info.hashAlgorithm, + memoize: true + }).then(data => { + body.write(data, () => { + body.end() + }) + }, err => body.emit('error', err)) + } + cb() // throw away dummy data + } + }) + body.write('dummy') return new fetch.Response(body, { - url: request.url, - headers: info.metadata.headers, + url: req.url, + headers: info.metadata.resHeaders, status: 200, size: stat.size }) @@ -80,56 +102,74 @@ module.exports = class Cache { }) } - // Returns a Promise that resolves to an array of all matching requests in - // the Cache object. - matchAll (request, options) { - return this.Promise.reject(new Error('Cache.matchAll not implemented')) - } - - // Takes a URL, retrieves it and adds the resulting response object to the - // given cache. This is fuctionally equivalent to calling fetch(), then using - // Cache.put() to add the results to the cache. - add (request) { - return this.Promise.reject(new Error('Cache.add not implemented')) - } - - // Takes an array of URLs, retrieves them, and adds the resulting response - // objects to the given cache. - addAll (requests) { - return this.Promise.reject(new Error('Cache.addAll not implemented')) - } - // Takes both a request and its response and adds it to the given cache. put (request, response) { const req = new fetch.Request(request) const size = response.headers.get('content-length') const fitInMemory = !!size && size < MAX_MEM_SIZE + const warningCode = (response.headers.get('Warning') || '').match(/^\d+/) + if (warningCode && +warningCode >= 100 && +warningCode < 200) { + // https://tools.ietf.org/html/rfc7234#section-4.3.4 + response.headers.delete('Warning') + } const opts = { metadata: { - url: request.url, - headers: response.headers.raw() + url: req.url, + reqHeaders: req.headers.raw(), + resHeaders: response.headers.raw() }, uid: this._uid, gid: this._gid, size, memoize: fitInMemory } + if (req.method === 'HEAD' || response.status === 304) { + // Update metadata without writing + return cacache.get.info(this._path, cacheKey(req)).then(info => { + // Providing these will bypass content write + opts.hashAlgorithm = info.hashAlgorithm + opts.digest = info.digest + return new this.Promise((resolve, reject) => { + pipe( + cacache.get.stream.byDigest(this._path, info.digest, opts), + cacache.put.stream(this._path, cacheKey(req), opts), + err => err ? reject(err) : resolve(response) + ) + }) + }).then(() => response) + } let buf = [] let bufSize = 0 - let cacheStream = fitInMemory - ? to({highWaterMark: MAX_MEM_SIZE}, (chunk, enc, cb) => { - buf.push(chunk) - bufSize += chunk.length - cb() + let cacheTargetStream = false + const cachePath = this._path + let cacheStream = to((chunk, enc, cb) => { + if (!cacheTargetStream) { + if (fitInMemory) { + cacheTargetStream = + to({highWaterMark: MAX_MEM_SIZE}, (chunk, enc, cb) => { + buf.push(chunk) + bufSize += chunk.length + cb() + }, done => { + cacache.put( + cachePath, + cacheKey(req), + Buffer.concat(buf, bufSize), + opts + ).then( + () => done(), + done + ) + }) + } else { + cacheTargetStream = + cacache.put.stream(cachePath, cacheKey(req), opts) + } + } + cacheTargetStream.write(chunk, enc, cb) }, done => { - cacache.put( - this._path, - cacheKey(req), - Buffer.concat(buf, bufSize), - opts - ).then(() => done(), done) + cacheTargetStream ? cacheTargetStream.end(done) : done() }) - : cacache.put.stream(this._path, cacheKey(req), opts) const oldBody = response.body const newBody = through({highWaterMark: fitInMemory && MAX_MEM_SIZE}) response.body = newBody @@ -141,7 +181,11 @@ module.exports = class Cache { newBody.write(chunk, enc, cb) }) }, done => { - cacheStream.end(() => newBody.end(done)) + cacheStream.end(() => { + newBody.end(() => { + done() + }) + }) }), err => err && newBody.emit('error', err)) return response } @@ -153,26 +197,29 @@ module.exports = class Cache { const req = new fetch.Request(request) return cacache.rm.entry( this._path, - cacheKey(req.url) + cacheKey(req) // TODO - true/false ).then(() => false) } - - keys (request, options) { - return cacache.ls(this._path).then(entries => Object.keys(entries)) - } } function matchDetails (req, cached, opts) { const reqUrl = url.parse(req.url) const cacheUrl = url.parse(cached.url) - if (!(opts && opts.ignoreSearch) && (cacheUrl.search !== reqUrl.search)) { - return false - } - if (!(opts && opts.ignoreMethod) && req.method && req.method !== 'GET') { - return false + const vary = cached.resHeaders.get('Vary') + // https://tools.ietf.org/html/rfc7234#section-4.1 + if (vary) { + if (vary.match(/\*/)) { + return false + } else { + const fieldsMatch = vary.split(/\s*,\s*/).every(field => { + return cached.reqHeaders.get(field) === req.headers.get(field) + }) + if (!fieldsMatch) { + return false + } + } } - // TODO - opts.ignoreVary? reqUrl.hash = null cacheUrl.hash = null return url.format(reqUrl) === url.format(cacheUrl) diff --git a/index.js b/index.js index 9d44fd0..88b9804 100644 --- a/index.js +++ b/index.js @@ -16,7 +16,7 @@ module.exports = cachingFetch function cachingFetch (uri, _opts) { const opts = {} Object.keys(_opts || {}).forEach(k => { opts[k] = _opts[k] }) - opts.method = opts.method && opts.method.toUpperCase() + opts.method = (opts.method || 'GET').toUpperCase() if (typeof opts.cacheManager === 'string' && !Cache) { // Default cacache-based cache Cache = require('./cache') @@ -38,12 +38,16 @@ function cachingFetch (uri, _opts) { opts.cache = 'no-store' } if ( - (!opts.method || opts.method === 'GET') && + (opts.method === 'GET' || opts.method === 'HEAD') && opts.cacheManager && opts.cache !== 'no-store' && opts.cache !== 'reload' ) { - return opts.cacheManager.match(uri, opts.cacheOpts).then(res => { + const req = new fetch.Request(uri, { + method: opts.method, + headers: opts.headers + }) + return opts.cacheManager.match(req, opts.cacheOpts).then(res => { if (res && opts.cache === 'default' && !isStale(res)) { return res } else if (res && (opts.cache === 'default' || opts.cache === 'no-cache')) { @@ -63,6 +67,13 @@ function cachingFetch (uri, _opts) { // https://tools.ietf.org/html/rfc7234#section-4.2 function isStale (res) { if (!res) { return null } + const ctrl = res.headers.get('Cache-Control') || '' + if (ctrl.match(/must-revalidate/i)) { + return true + } + if (ctrl.match(/immutable/i)) { + return false + } const maxAge = freshnessLifetime(res) const currentAge = (new Date() - new Date(res.headers.get('Date') || new Date())) / 1000 return maxAge <= currentAge @@ -71,8 +82,12 @@ function isStale (res) { // https://tools.ietf.org/html/rfc7234#section-4.2.1 function freshnessLifetime (res) { const cacheControl = res.headers.get('Cache-Control') || '' - const maxAgeMatch = cacheControl.match(/(s-maxage|max-age)\s*=\s*(\d+)/i) - const noCacheMatch = cacheControl.match(/no-cache/i) + const pragma = res.headers.get('Pragma') || '' + const maxAgeMatch = cacheControl.match(/(?:s-maxage|max-age)\s*=\s*(\d+)/i) + const noCacheMatch = ( + cacheControl.match(/no-cache/i) || + pragma.match(/no-cache/i) + ) if (noCacheMatch) { // no-cache requires revalidation on every request return 0 @@ -91,7 +106,9 @@ function freshnessLifetime (res) { function heuristicFreshness (res) { const lastMod = res.headers.get('Last-Modified') const date = new Date(res.headers.get('Date') || new Date()) - !res.headers.get('Warning') && res.headers.set('Warning', 113) + !res.headers.get('Warning') && setWarning( + res, 113, 'Used heuristics to calculate cache freshness' + ) if (lastMod) { const age = (date - new Date(lastMod)) / 1000 return Math.min(age * 0.1, 300) @@ -101,45 +118,60 @@ function heuristicFreshness (res) { } function condFetch (uri, cachedRes, opts) { + const ctrl = cachedRes.headers.get('cache-control') || '' const newHeaders = {} Object.keys(opts.headers || {}).forEach(k => { newHeaders[k] = opts.headers[k] }) if (cachedRes.headers.get('etag')) { - const condHeader = opts.method && opts.method !== 'GET' + const condHeader = opts.method !== 'GET' ? 'if-match' : 'if-none-match' newHeaders[condHeader] = cachedRes.headers.get('etag') } if (cachedRes.headers.get('last-modified')) { - const condHeader = opts.method && opts.method !== 'GET' + const condHeader = opts.method !== 'GET' && opts.method !== 'HEAD' ? 'if-unmodified-since' : 'if-modified-since' newHeaders[condHeader] = cachedRes.headers.get('last-modified') } opts.headers = newHeaders + if (isStale(cachedRes)) { + setWarning(cachedRes, 110, 'Local cached response stale') + } return remoteFetch(uri, opts).then(condRes => { - const ctrl = cachedRes.headers.get('cache-control') || '' if (condRes.status === 304) { - // TODO - freshen up the cached entry condRes.body = cachedRes.body + condRes.headers.set('Warning', cachedRes.headers.get('Warning')) } else if (condRes.status >= 500 && !ctrl.match(/must-revalidate/i)) { - if (condRes.method === 'GET') { - return cachedRes - } else { - return opts.cacheManager.delete(uri).then(() => cachedRes) - } + setWarning( + cachedRes, 111, `Revalidation failed. Returning stale response` + ) + return cachedRes } - if (condRes.method !== 'GET') { - return opts.cacheManager.delete(uri).then(() => condRes) + return condRes + }).catch(err => { + if (ctrl.match(/must-revalidate/i)) { + throw err } else { - return condRes + setWarning(cachedRes, 111, `Unexpected error: ${err.message}`) + return cachedRes } - }).catch(() => { - return cachedRes }) } +function setWarning (reqOrRes, code, message, host) { + host = host || 'localhost' + reqOrRes.headers.set( + 'Warning', + `${code} ${host} ${ + JSON.stringify(message) + } ${ + JSON.stringify(new Date().toUTCString) + }` + ) +} + function remoteFetch (uri, opts) { const agent = getAgent(uri, opts) const headers = { @@ -151,22 +183,40 @@ function remoteFetch (uri, opts) { headers[k] = opts.headers[k] }) } - const reqOpts = Object.create(opts) - reqOpts.headers = headers - reqOpts.agent = agent + const reqOpts = { + agent, + body: opts.body, + compress: opts.compress, + follow: opts.follow, + headers, + method: opts.method, + redirect: opts.redirect, + size: opts.size, + timeout: opts.timeout + } return retry((retryHandler, attemptNum) => { const req = new fetch.Request(uri, reqOpts) return fetch(req).then(res => { const cacheCtrl = res.headers.get('cache-control') || '' if ( - req.method === 'GET' && + (req.method === 'GET' || req.method === 'HEAD') && opts.cacheManager && !cacheCtrl.match(/no-store/i) && opts.cache !== 'no-store' && // No other statuses should be stored! - res.status === 200 + (res.status === 200 || res.status === 304) ) { return opts.cacheManager.put(req, res, opts.cacheOpts) + } else if (opts.cacheManager && ( + (req.method !== 'GET' && req.method !== 'PUT') + )) { + return opts.cacheManager.delete(req, opts.cacheOpts).then(() => { + if (req.method !== 'POST' && res.status >= 500) { + return retryHandler(res) + } else { + return res + } + }) } else if (req.method !== 'POST' && res.status >= 500) { return retryHandler(res) } else { @@ -179,7 +229,13 @@ function remoteFetch (uri, opts) { throw err } }) - }, opts.retry) + }, opts.retry).catch(err => { + if (err.status >= 500) { + return err + } else { + throw err + } + }) } function getAgent (uri, opts) {