From 80158ecb60e437880338491d6cd59aa720235c6f Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Fri, 5 Jul 2019 20:02:08 +0200 Subject: [PATCH 1/3] stream: null push transform in async_iterator when the readable side of a transform ends any for await loop on that transform stream should also complete. This fix prevents for await loop on a transform stream from hanging indefinitely. https://github.com/nodejs/node/pull/28566 --- lib/internal/streams/async_iterator.js | 2 +- .../test-stream-readable-async-iterators.js | 24 ++++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js index defba235adbd56..89a1dae7fdfb02 100644 --- a/lib/internal/streams/async_iterator.js +++ b/lib/internal/streams/async_iterator.js @@ -155,7 +155,7 @@ const createReadableStreamAsyncIterator = (stream) => { }); iterator[kLastPromise] = null; - finished(stream, (err) => { + finished(stream, { writable: false }, (err) => { if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { const reject = iterator[kLastReject]; // Reject if we are waiting for data in the Promise returned by next() and diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index b50a6b1734b32b..0fc701e82ed929 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -1,7 +1,7 @@ 'use strict'; const common = require('../common'); -const { Readable, PassThrough, pipeline } = require('stream'); +const { Readable, Transform, PassThrough, pipeline } = require('stream'); const assert = require('assert'); async function tests() { @@ -396,6 +396,28 @@ async function tests() { } } + console.log('readable side of a transform stream pushes null'); + { + const transform = new Transform({ + objectMode: true, + transform(chunk, enc, cb) { + cb(null, chunk); + } + }); + transform.push(0); + transform.push(1); + transform.push(null); + const mustReach = common.mustCall(); + const iter = transform[Symbol.asyncIterator](); + assert.strictEqual((await iter.next()).value, 0); + + for await (const d of iter) { + assert.strictEqual(d, 1); + } + + mustReach(); + } + { console.log('all next promises must be resolved on end'); const r = new Readable({ From aa9de1d6e47d43abf9d21936031c7f95e3049abc Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Sun, 7 Jul 2019 21:41:00 +0200 Subject: [PATCH 2/3] tests: clarify transform async iterator test --- test/parallel/test-stream-readable-async-iterators.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 0fc701e82ed929..5c77181e68e31b 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -400,9 +400,7 @@ async function tests() { { const transform = new Transform({ objectMode: true, - transform(chunk, enc, cb) { - cb(null, chunk); - } + transform: common.mustNotCall() }); transform.push(0); transform.push(1); From 6382470b37dcf1eb5c657ad65a253a28fd3cb6b1 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Fri, 12 Jul 2019 14:46:29 +0200 Subject: [PATCH 3/3] tests: fix transform async iterator test --- .../test-stream-readable-async-iterators.js | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 5c77181e68e31b..12971cb2363a80 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -396,24 +396,28 @@ async function tests() { } } - console.log('readable side of a transform stream pushes null'); { + console.log('readable side of a transform stream pushes null'); const transform = new Transform({ objectMode: true, - transform: common.mustNotCall() + transform: (chunk, enc, cb) => { cb(null, chunk); } }); transform.push(0); transform.push(1); - transform.push(null); - const mustReach = common.mustCall(); + process.nextTick(() => { + transform.push(null); + }); + + const mustReach = [ common.mustCall(), common.mustCall() ]; + const iter = transform[Symbol.asyncIterator](); assert.strictEqual((await iter.next()).value, 0); for await (const d of iter) { assert.strictEqual(d, 1); + mustReach[0](); } - - mustReach(); + mustReach[1](); } {