Skip to content

Commit

Permalink
streams: avoid calls to listenerCount
Browse files Browse the repository at this point in the history
PR-URL: nodejs#50357
  • Loading branch information
ronag committed Oct 24, 2023
1 parent 8f742bb commit 6ecfbcc
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ const kHasFlowing = 1 << 23;
const kFlowing = 1 << 24;
const kHasPaused = 1 << 25;
const kPaused = 1 << 26;
const kDataListening = 1 << 27;

// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
Expand Down Expand Up @@ -531,8 +532,7 @@ function canPushMore(state) {
}

function addChunk(stream, state, chunk, addToFront) {
if ((state[kState] & (kFlowing | kSync)) === kFlowing && state.length === 0 &&
stream.listenerCount('data') > 0) {
if ((state[kState] & (kFlowing | kSync | kDataListening)) === (kFlowing | kDataListening) && state.length === 0) {
// Use the guard to avoid creating `Set()` repeatedly
// when we have multiple pipes.
if ((state[kState] & kMultiAwaitDrain) !== 0) {
Expand Down Expand Up @@ -1058,7 +1058,7 @@ function pipeOnDrain(src, dest) {
}

if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
src.listenerCount('data')) {
(state[kState] & kDataListening) !== 0) {
src.resume();
}
};
Expand Down Expand Up @@ -1105,6 +1105,8 @@ Readable.prototype.on = function(ev, fn) {
const state = this._readableState;

if (ev === 'data') {
state[kState] |= kDataListening;

// Update readableListening so that resume() may be a no-op
// a few lines down. This is needed to support once('readable').
state.readableListening = this.listenerCount('readable') > 0;
Expand All @@ -1131,6 +1133,8 @@ Readable.prototype.on = function(ev, fn) {
Readable.prototype.addListener = Readable.prototype.on;

Readable.prototype.removeListener = function(ev, fn) {
const state = this._readableState;

const res = Stream.prototype.removeListener.call(this,
ev, fn);

Expand All @@ -1142,6 +1146,8 @@ Readable.prototype.removeListener = function(ev, fn) {
// resume within the same tick will have no
// effect.
process.nextTick(updateReadableListening, this);
} else if (ev === 'data' && this.listenerCount('data') === 0) {
state[kState] &= ~kDataListening;
}

return res;
Expand Down Expand Up @@ -1175,7 +1181,7 @@ function updateReadableListening(self) {
state[kState] |= kHasFlowing | kFlowing;

// Crude way to check if we should resume.
} else if (self.listenerCount('data') > 0) {
} else if ((state[kState] & kDataListening) !== 0) {
self.resume();
} else if ((state[kState] & kReadableListening) === 0) {
state[kState] &= ~(kHasFlowing | kFlowing);
Expand Down

0 comments on commit 6ecfbcc

Please sign in to comment.