Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: Bypass legacy destroy for pipeline and async iteration. #38505

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const {
prepareError,
} = require('_http_common');
const { OutgoingMessage } = require('_http_outgoing');
const { kDestroy } = require('internal/streams/destroy');
const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
Expand Down Expand Up @@ -609,6 +610,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
req.res = res;
res.req = req;
res[kDestroy] = null;

// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);
Expand Down
10 changes: 8 additions & 2 deletions lib/_http_incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const {
} = primordials;

const { Readable, finished } = require('stream');
const { kDestroy } = require('internal/streams/destroy');

const kHeaders = Symbol('kHeaders');
const kHeadersCount = Symbol('kHeadersCount');
Expand Down Expand Up @@ -188,13 +189,18 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
this.socket.destroy(err);
const cleanup = finished(this.socket, (e) => {
cleanup();
onError(this, e || err, cb);
process.nextTick(onError, this, e || err, cb);
});
} else {
onError(this, err, cb);
process.nextTick(onError, this, err, cb);
}
};

IncomingMessage.prototype[kDestroy] = function(err) {
this.socket = null;
this.destroy(err);
};

IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
function _addHeaderLines(headers, n) {
if (headers && headers.length) {
Expand Down
12 changes: 6 additions & 6 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,7 @@ function onServerResponseClose() {
// where the ServerResponse object has already been deconstructed.
// Fortunately, that requires only a single if check. :-)
if (this._httpMessage) {
this._httpMessage.destroyed = true;
this._httpMessage._closed = true;
this._httpMessage.emit('close');
emitCloseNT(this._httpMessage);
}
}

Expand Down Expand Up @@ -837,9 +835,11 @@ function resOnFinish(req, res, socket, state, server) {
}

function emitCloseNT(self) {
self.destroyed = true;
self._closed = true;
self.emit('close');
if (!self.destroyed) {
self.destroyed = true;
self._closed = true;
self.emit('close');
}
}

// The following callback is issued after the headers have been read on a
Expand Down
59 changes: 55 additions & 4 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
codes: {
ERR_MULTIPLE_CALLBACK,
},
AbortError,
} = require('internal/errors');
const {
Symbol,
Expand Down Expand Up @@ -363,15 +364,65 @@ function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}

const kDestroyed = Symbol('kDestroyed');

function emitCloseLegacy(stream) {
stream.emit('close');
}

function emitErrorCloseLegacy(stream, err) {
stream.emit('error', err);
process.nextTick(emitCloseLegacy, stream);
}

function isDestroyed(stream) {
return stream.destroyed || stream[kDestroyed];
}

function isReadable(stream) {
return stream.readable && !stream.readableEnded && !isDestroyed(stream);
}

function isWritable(stream) {
return stream.writable && !stream.writableEnded && !isDestroyed(stream);
}

// Normalize destroy for legacy.
function destroyer(stream, err) {
if (isRequest(stream)) return stream.abort();
if (isRequest(stream.req)) return stream.req.abort();
if (typeof stream.destroy === 'function') return stream.destroy(err);
if (typeof stream.close === 'function') return stream.close();
if (isDestroyed(stream)) {
return;
}

ronag marked this conversation as resolved.
Show resolved Hide resolved
if (!err && (isReadable(stream) || isWritable(stream))) {
err = new AbortError();
}

// TODO: Remove isRequest branches.
if (typeof stream[kDestroy] === 'function') {
stream[kDestroy](err);
} else if (isRequest(stream)) {
stream.abort();
} else if (isRequest(stream.req)) {
stream.req.abort();
} else if (typeof stream.destroy === 'function') {
stream.destroy(err);
} else if (typeof stream.close === 'function') {
// TODO: Don't lose err?
stream.close();
} else if (err) {
process.nextTick(emitErrorCloseLegacy, stream);
} else {
process.nextTick(emitCloseLegacy, stream);
}

if (!stream.destroyed) {
stream[kDestroyed] = true;
}
}

module.exports = {
kDestroy,
isDestroyed,
construct,
destroyer,
destroy,
Expand Down
2 changes: 2 additions & 0 deletions lib/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const {
} = require('internal/util');

const pipeline = require('internal/streams/pipeline');
const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');

Expand All @@ -45,6 +46,7 @@ Stream.pipeline = pipeline;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
Stream.destroy = destroyer;
ronag marked this conversation as resolved.
Show resolved Hide resolved

ObjectDefineProperty(Stream, 'promises', {
configurable: true,
Expand Down
115 changes: 115 additions & 0 deletions test/parallel/test-stream-destroy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
'use strict';

const common = require('../common');
const {
Writable,
Readable,
destroy
} = require('stream');
const assert = require('assert');
const http = require('http');

{
const r = new Readable({ read() {} });
destroy(r);
assert.strictEqual(r.destroyed, true);
r.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
r.on('close', common.mustCall());
}

{
const r = new Readable({ read() {} });
destroy(r, new Error('asd'));
assert.strictEqual(r.destroyed, true);
r.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
r.on('close', common.mustCall());
}

{
const w = new Writable({ write() {} });
destroy(w);
assert.strictEqual(w.destroyed, true);
w.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
w.on('close', common.mustCall());
}

{
const w = new Writable({ write() {} });
destroy(w, new Error('asd'));
assert.strictEqual(w.destroyed, true);
w.on('error', common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
w.on('close', common.mustCall());
}

{
const server = http.createServer((req, res) => {
destroy(req);
req.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
req.on('close', common.mustCall(() => {
res.end('hello');
}));
});

server.listen(0, () => {
const req = http.request({
port: server.address().port
});

req.write('asd');
req.on('response', (res) => {
const buf = [];
res.on('data', (data) => buf.push(data));
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(
Buffer.concat(buf),
Buffer.from('hello')
);
server.close();
}));
});
});
}

{
const server = http.createServer((req, res) => {
req
.resume()
.on('end', () => {
destroy(req);
})
.on('error', common.mustNotCall());

req.on('close', common.mustCall(() => {
res.end('hello');
}));
});

server.listen(0, () => {
const req = http.request({
port: server.address().port
});

req.write('asd');
req.on('response', (res) => {
const buf = [];
res.on('data', (data) => buf.push(data));
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(
Buffer.concat(buf),
Buffer.from('hello')
);
server.close();
}));
});
});
}