Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib: use Writable#_final for 'finish' tracking #27

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 38 additions & 59 deletions lib/Dicer.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,44 +48,8 @@ class Dicer extends Writable {
this._part.emit('header', header);
});
this._hparser.on('error', (err) => {
if (this._part && !this._ignoreData) {
this._part.emit('error', err);
this._part.push(null);
}
});
}

emit(ev) {
if (ev !== 'finish' || this._realFinish) {
Writable.prototype.emit.apply(this, arguments);
return;
}

if (this._finished)
return;

process.nextTick(() => {
this.emit('error', new Error('Unexpected end of multipart data'));

if (this._part && !this._ignoreData) {
const type = (this._isPreamble ? 'Preamble' : 'Part');
this._part.emit(
'error',
new Error(`${type} terminated early due to `
+ 'unexpected end of multipart data')
);
this._part.push(null);
process.nextTick(() => {
this._realFinish = true;
this.emit('finish');
this._realFinish = false;
});
return;
}

this._realFinish = true;
this.emit('finish');
this._realFinish = false;
if (this._part && !this._ignoreData)
this._part.destroy(err);
});
}

Expand All @@ -97,9 +61,7 @@ class Dicer extends Writable {
if (this._headerFirst && this._isPreamble) {
if (!this._part) {
this._part = new PartStream(this._partOpts);
if (this._events.preamble)
this.emit('preamble', this._part);
else
if (!this.emit('preamble', this._part))
ignore(this);
}
const r = this._hparser.push(data);
Expand All @@ -123,6 +85,34 @@ class Dicer extends Writable {
cb();
}

_final(cb) {
if (this._finished) {
if (this._parts !== 0) {
this._pause = true;
this._cb = cb;
return;
}

cb();
return;
}

if (this._part && !this._ignoreData) {
const type = (this._isPreamble ? 'Preamble' : 'Part');
this._part.destroy(
new Error(`${type} terminated early due to `
+ 'unexpected end of multipart data')
);
ignore(this);
}

// Node <= 12 compatibility, otherwise `part`'s 'error'/'end' have no chance
// to emit.
process.nextTick(() => {
cb(new Error('Unexpected end of multipart data'));
});
}

reset() {
this._part = undefined;
this._bparser = undefined;
Expand Down Expand Up @@ -154,16 +144,14 @@ function onInfo(isMatch, data, start, end) {
}
}
if (this._dashes === 2) {
if ((start + i) < end && this._events.trailer)
if ((start + i) < end && this.listenerCount('trailer'))
this.emit('trailer', data.slice(start + i, end));
this.reset();
this._finished = true;
// No more parts will be added
if (this._parts === 0) {
this._realFinish = true;
this.emit('finish');
this._realFinish = false;
}
if (this._parts === 0)
unpause(this);

}
if (this._dashes)
return;
Expand All @@ -176,9 +164,7 @@ function onInfo(isMatch, data, start, end) {
unpause(this);
};
ev = this._isPreamble ? 'preamble' : 'part';
if (this._events[ev])
this.emit(ev, this._part);
else
if (!this.emit(ev, this._part))
ignore(this);
if (!this._isPreamble)
this._inHeader = true;
Expand All @@ -205,15 +191,8 @@ function onInfo(isMatch, data, start, end) {
} else {
++this._parts;
this._part.on('end', () => {
if (--this._parts === 0) {
if (this._finished) {
this._realFinish = true;
this.emit('finish');
this._realFinish = false;
} else {
unpause(this);
}
}
if (--this._parts === 0)
unpause(this);
});
}
this._part.push(null);
Expand Down
49 changes: 33 additions & 16 deletions test/test-multipart.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ function next() {
header: undefined
};

const onPreambleEnd = () => {
if (preamble.body)
preamble.body = Buffer.concat(preamble.body, preamble.bodylen);
if (preamble.body || preamble.header)
state.preamble = preamble;
};

p.on('header', (h) => {
preamble.header = h;
if (v.setBoundary)
Expand All @@ -100,12 +107,8 @@ function next() {
preamble.bodylen += data.length;
}).on('error', (err) => {
preamble.error = err;
}).on('end', () => {
if (preamble.body)
preamble.body = Buffer.concat(preamble.body, preamble.bodylen);
if (preamble.body || preamble.header)
state.preamble = preamble;
});
onPreambleEnd();
}).on('end', onPreambleEnd);
});
dicer.on('part', (p) => {
const part = {
Expand All @@ -115,6 +118,12 @@ function next() {
header: undefined
};

const onPartEnd = () => {
if (part.body)
part.body = Buffer.concat(part.body, part.bodylen);
state.parts.push(part);
};

p.on('header', (h) => {
part.header = h;
}).on('data', (data) => {
Expand All @@ -126,15 +135,23 @@ function next() {
}).on('error', (err) => {
part.error = err;
++partErrors;
}).on('end', () => {
if (part.body)
part.body = Buffer.concat(part.body, part.bodylen);
state.parts.push(part);
});
}).on('error', (err) => {
error = err;
}).on('finish', () => {
assert(finishes++ === 0, makeMsg(v.what, 'finish emitted multiple times'));
onPartEnd();
}).on('end', onPartEnd);
}).on('error', onFinish).on('finish', onFinish);

function onFinish(err) {
if (err) {
assert(
error === undefined,
makeMsg(v.what, 'error emitted multiple times')
);
error = err;
}

// Node <= 12 emits both 'error' and 'end', while Node > 14 emits only
// 'error'.
if (finishes++ > 0)
return;

if (v.dicerError) {
assert(error !== undefined, makeMsg(v.what, 'Expected error'));
Expand Down Expand Up @@ -242,7 +259,7 @@ function next() {
}
++t;
next();
});
}

fs.createReadStream(fixtureBase + '/original').pipe(dicer);
}
Expand Down
30 changes: 30 additions & 0 deletions test/test-pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict';

const assert = require('assert');

const { Readable, pipeline } = require('stream');
const Dicer = require('..');

const r = new Readable({ read() {} });
const d = new Dicer({ boundary: 'a' });

let isFinished = false;

d.on('part', async (part) => {
part.resume();
});

r.push('--a\r\nA: 1\r\nB: 1\r\n\r\n123\r\n--a\r\n\r\n456\r\n--a--\r\n');
setImmediate(() => {
r.push(null);
});

pipeline(r, d, (error) => {
assert(isFinished === false, 'Double-invocation of pipeline callback');
assert(error === undefined, 'Unexpected pipeline error');
isFinished = true;
});

process.on('exit', () => {
assert(isFinished === true, 'Should finish before exiting');
});