From b03845b9376aec590b89f753a4b7c1b47729c5f8 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 18 Jul 2019 13:15:42 +0200 Subject: [PATCH] stream: make finished call the callback if the stream is closed Make stream.finished callback invoked if stream is already closed/destroyed. PR-URL: https://github.com/nodejs/node/pull/28748 Reviewed-By: Matteo Collina Reviewed-By: Benjamin Gruenbaum Reviewed-By: James M Snell Reviewed-By: Trivikram Kamat --- lib/internal/streams/async_iterator.js | 10 -- lib/internal/streams/end-of-stream.js | 40 +++--- test/parallel/test-http-client-finished.js | 106 ++++++++++++++++ test/parallel/test-stream-finished.js | 140 ++++++++++++++++++++- 4 files changed, 266 insertions(+), 30 deletions(-) diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index 07f2191e7134ce..083befb89cc93f 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -112,16 +112,6 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({ return() { return new Promise((resolve, reject) => { const stream = this[kStream]; - - // TODO(ronag): Remove this check once finished() handles - // already ended and/or destroyed streams. - const ended = stream.destroyed || stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); - if (ended) { - resolve(createIterResult(undefined, true)); - return; - } - finished(stream, (err) => { if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { reject(err); diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 3f1c0f316cd3c6..ca6091fe55fe8a 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -28,6 +28,25 @@ function eos(stream, opts, callback) { callback = once(callback); + const onerror = (err) => { + callback.call(stream, err); + }; + + let writableFinished = stream.writableFinished || + (stream._writableState && stream._writableState.finished); + let readableEnded = stream.readableEnded || + (stream._readableState && stream._readableState.endEmitted); + + if (writableFinished || readableEnded || stream.destroyed || + stream.aborted) { + if (opts.error !== false) stream.on('error', onerror); + // A destroy(err) call emits error in nextTick. + process.nextTick(callback.bind(stream)); + return () => { + stream.removeListener('error', onerror); + }; + } + let readable = opts.readable || (opts.readable !== false && stream.readable); let writable = opts.writable || (opts.writable !== false && stream.writable); @@ -35,36 +54,23 @@ function eos(stream, opts, callback) { if (!stream.writable) onfinish(); }; - var writableEnded = stream._writableState && stream._writableState.finished; const onfinish = () => { writable = false; - writableEnded = true; + writableFinished = true; if (!readable) callback.call(stream); }; - var readableEnded = stream.readableEnded || - (stream._readableState && stream._readableState.endEmitted); const onend = () => { readable = false; readableEnded = true; if (!writable) callback.call(stream); }; - const onerror = (err) => { - callback.call(stream, err); - }; - const onclose = () => { - let err; if (readable && !readableEnded) { - if (!stream._readableState || !stream._readableState.ended) - err = new ERR_STREAM_PREMATURE_CLOSE(); - return callback.call(stream, err); - } - if (writable && !writableEnded) { - if (!stream._writableState || !stream._writableState.ended) - err = new ERR_STREAM_PREMATURE_CLOSE(); - return callback.call(stream, err); + callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); + } else if (writable && !writableFinished) { + callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE()); } }; diff --git a/test/parallel/test-http-client-finished.js b/test/parallel/test-http-client-finished.js index 2d7e5b95b3ca33..337f7b596d7442 100644 --- a/test/parallel/test-http-client-finished.js +++ b/test/parallel/test-http-client-finished.js @@ -25,3 +25,109 @@ const { finished } = require('stream'); .end(); })); } + +{ + // Test abort before finished. + + const server = http.createServer(function(req, res) { + }); + + server.listen(0, common.mustCall(function() { + const req = http.request({ + port: this.address().port + }, common.mustNotCall()); + req.abort(); + finished(req, common.mustCall(() => { + server.close(); + })); + })); +} + +{ + // Test abort after request. + + const server = http.createServer(function(req, res) { + }); + + server.listen(0, common.mustCall(function() { + const req = http.request({ + port: this.address().port + }).end(); + finished(req, (err) => { + common.expectsError({ + type: Error, + code: 'ERR_STREAM_PREMATURE_CLOSE' + })(err); + finished(req, common.mustCall(() => { + server.close(); + })); + }); + req.abort(); + })); +} + +{ + // Test abort before end. + + const server = http.createServer(function(req, res) { + res.write('test'); + }); + + server.listen(0, common.mustCall(function() { + const req = http.request({ + port: this.address().port + }).on('response', common.mustCall((res) => { + req.abort(); + finished(res, common.mustCall(() => { + finished(res, common.mustCall(() => { + server.close(); + })); + })); + })).end(); + })); +} + +{ + // Test destroy before end. + + const server = http.createServer(function(req, res) { + res.write('test'); + }); + + server.listen(0, common.mustCall(function() { + http.request({ + port: this.address().port + }).on('response', common.mustCall((res) => { + // TODO(ronag): Bug? Won't emit 'close' unless read. + res.on('data', () => {}); + res.destroy(); + finished(res, common.mustCall(() => { + finished(res, common.mustCall(() => { + server.close(); + })); + })); + })).end(); + })); +} + +{ + // Test finish after end. + + const server = http.createServer(function(req, res) { + res.end('asd'); + }); + + server.listen(0, common.mustCall(function() { + http.request({ + port: this.address().port + }).on('response', common.mustCall((res) => { + // TODO(ronag): Bug? Won't emit 'close' unless read. + res.on('data', () => {}); + finished(res, common.mustCall(() => { + finished(res, common.mustCall(() => { + server.close(); + })); + })); + })).end(); + })); +} diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js index d6361ea303635d..c5e792b45e2e4b 100644 --- a/test/parallel/test-stream-finished.js +++ b/test/parallel/test-stream-finished.js @@ -97,6 +97,18 @@ const { promisify } = require('util'); })); } +{ + const rs = new Readable(); + + finished(rs, common.mustCall((err) => { + assert(err, 'premature close error'); + })); + + rs.push(null); + rs.emit('close'); + rs.resume(); +} + { const rs = new Readable(); @@ -105,7 +117,9 @@ const { promisify } = require('util'); })); rs.push(null); - rs.emit('close'); // Should not trigger an error + rs.on('end', common.mustCall(() => { + rs.emit('close'); // Should not trigger an error + })); rs.resume(); } @@ -155,8 +169,9 @@ const { promisify } = require('util'); rs.resume(); } -// Test that calling returned function removes listeners { + // Nothing happens if disposed. + const ws = new Writable({ write(data, env, cb) { cb(); @@ -168,6 +183,8 @@ const { promisify } = require('util'); } { + // Nothing happens if disposed. + const rs = new Readable(); const removeListeners = finished(rs, common.mustNotCall()); removeListeners(); @@ -178,9 +195,126 @@ const { promisify } = require('util'); } { + // Completed if readable-like is ended before. + const streamLike = new EE(); streamLike.readableEnded = true; streamLike.readable = true; - finished(streamLike, common.mustCall); + finished(streamLike, common.mustCall()); +} + +{ + // Completed if readable-like is never ended. + + const streamLike = new EE(); + streamLike.readableEnded = false; + streamLike.readable = true; + finished(streamLike, common.expectsError({ + code: 'ERR_STREAM_PREMATURE_CLOSE' + })); + streamLike.emit('close'); +} + +{ + // Completed if writable-like is destroyed before. + + const streamLike = new EE(); + streamLike.destroyed = true; + streamLike.writable = true; + finished(streamLike, common.mustCall()); +} + +{ + // Completed if readable-like is aborted before. + + const streamLike = new EE(); + streamLike.destroyed = true; + streamLike.readable = true; + finished(streamLike, common.mustCall()); +} + +{ + // Completed if writable-like is aborted before. + + const streamLike = new EE(); + streamLike.aborted = true; + streamLike.writable = true; + finished(streamLike, common.mustCall()); +} + +{ + // Completed if readable-like is aborted before. + + const streamLike = new EE(); + streamLike.aborted = true; + streamLike.readable = true; + finished(streamLike, common.mustCall()); +} + +{ + // Completed if streamlike is finished before. + + const streamLike = new EE(); + streamLike.writableFinished = true; + streamLike.writable = true; + finished(streamLike, common.mustCall()); +} + +{ + // Premature close if stream is not finished. + + const streamLike = new EE(); + streamLike.writableFinished = false; + streamLike.writable = true; + finished(streamLike, common.expectsError({ + code: 'ERR_STREAM_PREMATURE_CLOSE' + })); + streamLike.emit('close'); +} + +{ + // Premature close if stream never emitted 'finish' + // even if writableFinished says something else. + + const streamLike = new EE(); + streamLike.writable = true; + finished(streamLike, common.expectsError({ + code: 'ERR_STREAM_PREMATURE_CLOSE' + })); + streamLike.writableFinished = true; + streamLike.emit('close'); +} + + +{ + // Premature close if stream never emitted 'end' + // even if readableEnded says something else. + + const streamLike = new EE(); + streamLike.readable = true; + finished(streamLike, common.expectsError({ + code: 'ERR_STREAM_PREMATURE_CLOSE' + })); + streamLike.readableEnded = true; streamLike.emit('close'); } + +{ + // Completes if already finished. + + const w = new Writable(); + finished(w, common.mustCall(() => { + finished(w, common.mustCall()); + })); + w.destroy(); +} + +{ + // Completes if already ended. + + const r = new Readable(); + finished(r, common.mustCall(() => { + finished(r, common.mustCall()); + })); + r.destroy(); +}