Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problems with cooperation between http and pipeline/"for await" #38262

Closed
misos1 opened this issue Apr 16, 2021 · 18 comments
Closed

Problems with cooperation between http and pipeline/"for await" #38262

misos1 opened this issue Apr 16, 2021 · 18 comments
Labels
confirmed-bug Issues with confirmed bugs. http Issues or PRs related to the http subsystem. promises Issues and PRs related to ECMAScript promises. stream Issues and PRs related to the stream subsystem.

Comments

@misos1
Copy link

misos1 commented Apr 16, 2021

Is your feature request related to a problem? Please describe.

Problem is that "for await" and pipeline destroy source stream on error ("for await" also on break) but destroying http request removes possibility to send meaningful response to client.

Example with "for await" (body length limiting):

let http = require("http");

let server = http.createServer(async function(req, res)
{
	try
	{
		let len = 0;
		for await (let chunk of req)
		{
			len += chunk.length;
			if(len > 2) throw "payload too large";
		}
		res.end("ok");
	}
	catch(err)
	{
		console.log("server log:", err);
		res.end(err);
	}
});

(async function()
{
	await new Promise(resolve => server.listen(8888, resolve));
	let req = http.request({ port: 8888, method: "post" });
	req.end("abc");
	try
	{
		let res = await new Promise((resolve, reject) => req.on("response", resolve).on("error", reject));
		let str = "";
		for await (let chunk of res) str += chunk;
		console.log("client received:", str);
	}
	catch(err)
	{
		console.log("client got unexpected error:", err);
	}
	server.close();
}());

Possible output:

server log: payload too large
client got unexpected error: Error: socket hang up
    at connResetException (node:internal/errors:642:14)
    at Socket.socketOnEnd (node:_http_client:486:23)
    at Socket.emit (node:events:381:22)
    at endReadableNT (node:internal/streams/readable:1307:12)
    at processTicksAndRejections (node:internal/process/task_queues:81:21) {
  code: 'ECONNRESET'
}

Possible solution is to not use pipeline but pipe instead. But this requires additional handling like to do req.unpipe and req.resume to avoid problems for example:

let http = require("http");
let { PassThrough } = require("stream");

let server = http.createServer(async function(req, res)
{
	try
	{
		let len = 0;
		for await (let chunk of req.pipe(new PassThrough()))
		{
			len += chunk.length;
			if(len > 2) throw "payload too large";
		}
		res.end("ok");
	}
	catch(err)
	{
		console.log("server log:", err);
		req.unpipe();
		req.resume();
		req.on("end", () => res.end(err));
	}
});

(async function()
{
	await new Promise(resolve => server.listen(8888, resolve));
	let req = http.request({ port: 8888, method: "post" });
	req.end("abc");
	try
	{
		let res = await new Promise((resolve, reject) => req.on("response", resolve).on("error", reject));
		let str = "";
		for await (let chunk of res) str += chunk;
		console.log("client received:", str);
	}
	catch(err)
	{
		console.log("client got unexpected error:", err);
	}
	server.close();
}());

This neglected solution with forgotten unpipe and resume cases EPIPE sometimes or stops after first iteration when using keepalive Agent:

let http = require("http");
let { PassThrough } = require("stream");

let server = http.createServer(async function(req, res)
{
	try
	{
		let len = 0;
		for await (let chunk of req.pipe(new PassThrough()))
		{
			len += chunk.length;
			if(len > 2) throw "payload too large";
		}
		res.end("ok");
	}
	catch(err)
	{
		res.end(err);
	}
});

(async function()
{
	await new Promise(resolve => server.listen(8888, resolve));
	let agent = new http.Agent({ keepAlive: true });
	let data = Buffer.alloc(1000000);
	for(let i = 0; ; i++)
	{
		console.log(i);
		let req = http.request({ port: 8888, method: "post"/*, agent*/ }).end(data);
		let res = await new Promise((resolve, reject) => req.on("response", resolve).on("error", reject));
		let str = "";
		for await (let chunk of res) str += chunk;
		console.log(str);
	}
}());
2572
payload too large
2573
node:internal/process/promises:245
          triggerUncaughtException(err, true /* fromPromise */);
          ^

Error: write EPIPE
    at WriteWrap.onWriteComplete [as oncomplete] (node:internal/stream_base_commons:94:16) {
  errno: -32,
  code: 'EPIPE',
  syscall: 'write'
}

Describe the solution you'd like

Pipe is too leak-prone for language which builds on GC. But a fixed and more comfortable pipeline and "for await" have unexpected consequences for http streams.

Would be great to have some options for the pipeline function to not destroy streams but instead dump them to the end in case of error (so one can send response when the pipeline ends). Or http request could have the option that destroy would just dump stream to end (and only then actually send response) instead of causing EPIPE or similar faults at client side.

@misos1 misos1 changed the title Problems with cooperation between http, pipeline and for await Problems with cooperation between http and pipeline respectively "for await" Apr 16, 2021
@misos1 misos1 changed the title Problems with cooperation between http and pipeline respectively "for await" Problems with cooperation between http and pipeline/"for await" Apr 16, 2021
@Ayase-252 Ayase-252 added the feature request Issues that request new features to be added to Node.js. label Apr 17, 2021
@benjamingr
Copy link
Member

I think this is a bug and not a feature request and that our current behaviour is incorrect. Good find.

@benjamingr benjamingr added confirmed-bug Issues with confirmed bugs. http Issues or PRs related to the http subsystem. promises Issues and PRs related to ECMAScript promises. stream Issues and PRs related to the stream subsystem. labels Apr 17, 2021
@benjamingr
Copy link
Member

cc @ronag @mcollina wdyt?

@Ayase-252 Ayase-252 removed the feature request Issues that request new features to be added to Node.js. label Apr 17, 2021
@benjamingr
Copy link
Member

This is because we call _destroy on the IncomingMessage which closes the socket:

 // If aborted and the underlying socket is not already destroyed,
  // destroy it.
  // We have to check if the socket is already destroyed because finished
  // does not call the callback when this methdod is invoked from `_http_client`
  // in `test/parallel/test-http-client-spurious-aborted.js`
  if (this.socket && !this.socket.destroyed && this.aborted) {
    this.socket.destroy(err);

This is likely a mistake in this case and we should have an opt out.

@benjamingr
Copy link
Member

Suggested fix:

  • Add a special symbol for the streams async iterator implementation
  • When converting an IncomingMessage to a readable stream - call into that symbol and set up a flag.
  • When .destroy is called like normal - if that flag is set don't .destroy the socket so the response can still happen.

Alternatively: we can also have a separate for await implementation for IncomingMessage.

@benjamingr
Copy link
Member

Something like (I didn't test):

diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js
index d09683c9a8..be971de5f9 100644
--- a/lib/_http_incoming.js
+++ b/lib/_http_incoming.js
@@ -75,7 +75,7 @@ function IncomingMessage(socket) {
   this[kTrailers] = null;
   this[kTrailersCount] = 0;
   this.rawTrailers = [];
-
+  this[kStreamClosedGracefully] = false;
   this.aborted = false;
 
   this.upgrade = null;
@@ -186,7 +186,7 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
   // We have to check if the socket is already destroyed because finished
   // does not call the callback when this methdod is invoked from `_http_client`
   // in `test/parallel/test-http-client-spurious-aborted.js`
-  if (this.socket && !this.socket.destroyed && this.aborted) {
+  if (this.socket && !this.socket.destroyed && this.aborted && !this[kStreamClosedGracefully]) {
     this.socket.destroy(err);
     const cleanup = finished(this.socket, (e) => {
       cleanup();
@@ -197,6 +197,10 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
   }
 };
 
+IncomingMessage.prototype[kAsyncIterateStream] = function() {
+  this[kStreamClosedGracefully] = true;
+};
+
 IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
 function _addHeaderLines(headers, n) {
   if (headers && headers.length) {
diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js
index cc3f5e93fd..0b67d222d7 100644
--- a/lib/internal/streams/readable.js
+++ b/lib/internal/streams/readable.js
@@ -43,6 +43,7 @@ const {
 } = primordials;
 
 module.exports = Readable;
+Readable.kAsyncIterateStream = kAsyncIterateStream;
 Readable.ReadableState = ReadableState;
 
 const EE = require('events');
@@ -71,7 +72,7 @@ const {
 } = require('internal/errors').codes;
 
 const kPaused = Symbol('kPaused');
-
+const kAsyncIterateStream = Symbol.for('nodejs.asyncIterateStream');
 // Lazy loaded to improve the startup performance.
 let StringDecoder;
 let from;
@@ -1140,6 +1141,9 @@ async function* createAsyncIterator(stream) {
     destroyImpl.destroyer(stream, err);
     throw err;
   } finally {
+    if (stream[kAsyncIterateStream]) {
+      stream[kAsyncIterateStream]();
+    }
     if (state.autoDestroy || !endEmitted) {
       // TODO(ronag): ERR_PREMATURE_CLOSE?
       destroyImpl.destroyer(stream, null);

@ronag
Copy link
Member

ronag commented Apr 19, 2021

I'm not sure I 100% follow here. I kind of get it. Can we have a smaller repro or even better a small test case?

@benjamingr
Copy link
Member

@ronag if you for await a request - the for await closes the request, which closes the socket - which means you can't write code like:

for await(const chunk of req) {
  await process(chunk);
}
res.write(someBuffer);
res.end(200);

Since the res.write would fail because the for await called .destroy on the request

@ronag
Copy link
Member

ronag commented Apr 19, 2021

I have some ideas. I'll get back later this week.

@benjamingr
Copy link
Member

@ronag I have a proposed solution above, it's pretty easy (don't .destroy the socket when closing the iterator in a for await). This is more blocked on deciding what to do than code - I'm happy to make the actual PR if the action is decided.

@ronag
Copy link
Member

ronag commented Apr 19, 2021

This is a fundamental problem with the way req/res works (i.e. destroying one destroys the other). I would like us to consider a way to opt out (non breaking) from this behavior into a more "sensible" behavior, e.g. by introducing a stream.destroy function that would call a destroy function behind a symbol if available, or something along those lines. This could also help with interop on "broken" streams that don't handle destroyed state properly etc... a little like https://github.com/nodejs/undici/blob/main/lib/core/util.js#L125.

@benjamingr
Copy link
Member

@ronag yeah the "how" of the solution I suggested would be to add a symbol for async iterator stream destruction and call the function for that symbol first if it's defined. Then define it on IncomingMessage to opt out of destroying the socket.

So it sounds pretty similar but you solution goes through an intermediary so it's generic and not just for async iterators.

@ronag
Copy link
Member

ronag commented Apr 19, 2021

@ronag yeah the "how" of the solution I suggested would be to add a symbol for async iterator stream destruction and call the function for that symbol first if it's defined. Then define it on IncomingMessage to opt out of destroying the socket.

So it sounds pretty similar but you solution goes through an intermediary so it's generic and not just for async iterators.

Yes, and I think createAsyncIterator and pipeline should use this stream.destroy helper. That should resolve this and similar issues.

function destroy (stream, err) {
  if (stream.destroyed || stream[kDestroyed]) {
    return
  }

  // Fallback to abort() or close() if no destroy is available?
  if (typeof stream[kDestroy] === 'function') {
    stream[kDestroy](err)
  } else if (typeof stream.destroy === 'function') {
    stream.destroy(err)
  } else if (err) {
    process.nextTick((stream, err) => {
      stream.emit('error', err)
    }, stream, err)
  }

  if (stream.destroyed !== true) {
    stream[kDestroyed] = true
  }
}

@benjamingr
Copy link
Member

That sounds like a reasonable approach to me.

@misos1
Copy link
Author

misos1 commented Apr 19, 2021

  • When .destroy is called like normal - if that flag is set don't .destroy the socket so the response can still happen.
    I have a proposed solution above, it's pretty easy (don't .destroy the socket when closing the iterator in a for await). This is more blocked on deciding what to do than code - I'm happy to make the actual PR if the action is decided.

But eventually at some point it has to be destroyed or dumped (read to the end ignoring chunks), yes? Sending response will not do it anymore if something already started reading request (as happens with my example with pipe): 3d480dc

Alternatively: we can also have a separate for await implementation for IncomingMessage.

I focused mainly on for await but it would be great to also fix http vs pipeline. For example pipeline(req, dest_stream, ...) will destroy the request when dest_stream fails and emits error so there is no chance to send a normal error response to the client. Or similarly when one want to send stream to response pipeline(src_stream, res, ...) when something on src_stream fails then is also not possible to send response as described in another issue: #26311

if you for await a request - the for await closes the request, which closes the socket - which means you can't write code like:

for await(const chunk of req) {
  await process(chunk);
}
res.write(someBuffer);
res.end(200);

I tested this (v15.12.0) and req.destroyed is false after for await and response is sent normally. It destroys req when there is a break or throw inside for await.

@RafaelGSS
Copy link
Member

Looks like this behavior was solved in the v17:

$ nvm use v17.1.0
Now using node v17.1.0 (npm v8.1.2)
$ rafaelgss@rafaelgss-desktop:~/repos/os/node$ node http-async-pipeline.js
server log: payload too large
client received: payload too large

Could you confirm @misos1?

@RafaelGSS
Copy link
Member

I have done a git bisect and apparently it #38505 solves that.

@misos1
Copy link
Author

misos1 commented Dec 10, 2021

Yes, my first example is seemingly working with v17. But as I warned in description and comments it occasionally causes EPIPE without keepalive or stops after first iteration when using keepalive Agent (uncomment agent on line with http.request to enable keepalive):

let http = require("http");

let server = http.createServer(async function(req, res)
{
	try
	{
		let len = 0;
		for await (let chunk of req)
		{
			len += chunk.length;
			if(len > 2) throw "payload too large";
		}
		res.end("ok");
	}
	catch(err)
	{
		console.log("server log:", err);
		res.end(err);
	}
});

(async function()
{
	await new Promise(resolve => server.listen(8888, resolve));
	let agent = new http.Agent({ keepAlive: true });
	let data = Buffer.alloc(1000000);
	for(let i = 0; ; i++)
	{
		console.log(i);
		let req = http.request({ port: 8888, method: "post"/*, agent*/ }).end(data);
		let res = await new Promise((resolve, reject) => req.on("response", resolve).on("error", reject));
		let str = "";
		for await (let chunk of res) str += chunk;
		console.log(str);
	}
}());

Without keepalive:

4635
server log: payload too large
payload too large
4636
server log: payload too large
node:internal/process/promises:246
          triggerUncaughtException(err, true /* fromPromise */);
          ^

Error: write EPIPE
    at WriteWrap.onWriteComplete [as oncomplete] (node:internal/stream_base_commons:94:16) {
  errno: -32,
  code: 'EPIPE',
  syscall: 'write'
}

Node.js v17.2.0

With keepalive:

0
server log: payload too large
payload too large
1
^C

So even on v17 with fix from #38505 my example is still not correct. Should I open a new issue?

@RafaelGSS
Copy link
Member

Yes, please.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
confirmed-bug Issues with confirmed bugs. http Issues or PRs related to the http subsystem. promises Issues and PRs related to ECMAScript promises. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

6 participants