Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs: implement byob mode for readableWebStream() #46933

Merged
merged 3 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion doc/api/fs.md
Original file line number Diff line number Diff line change
Expand Up @@ -446,14 +446,22 @@ Reads data from the file and stores that in the given buffer.
If the file is not modified concurrently, the end-of-file is reached when the
number of bytes read is zero.

#### `filehandle.readableWebStream()`
#### `filehandle.readableWebStream(options)`

<!-- YAML
added: v17.0.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46933
description: Added option to create a 'bytes' stream.
-->

> Stability: 1 - Experimental

* `options` {Object}
* `type` {string|undefined} Whether to open a normal or a `'bytes'` stream.
**Default:** `undefined`
debadree25 marked this conversation as resolved.
Show resolved Hide resolved

* Returns: {ReadableStream}

Returns a `ReadableStream` that may be used to read the files data.
Expand Down
77 changes: 61 additions & 16 deletions lib/internal/fs/promises.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const {
SafePromisePrototypeFinally,
Symbol,
Uint8Array,
FunctionPrototypeBind,
} = primordials;

const { fs: constants } = internalBinding('constants');
Expand Down Expand Up @@ -249,29 +250,73 @@ class FileHandle extends EventEmitterMixin(JSTransferable) {
* } ReadableStream
* @returns {ReadableStream}
*/
readableWebStream() {
readableWebStream(options = kEmptyObject) {
if (this[kFd] === -1)
throw new ERR_INVALID_STATE('The FileHandle is closed');
if (this[kClosePromise])
throw new ERR_INVALID_STATE('The FileHandle is closing');
if (this[kLocked])
throw new ERR_INVALID_STATE('The FileHandle is locked');
this[kLocked] = true;
const {
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');
const readable = newReadableStreamFromStreamBase(
this[kHandle],
undefined,
{ ondone: () => this[kUnref]() });

const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');
this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});

if (options.type !== undefined) {
validateString(options.type, 'options.type');
}

let readable;

if (options.type !== 'bytes') {
const {
newReadableStreamFromStreamBase,
} = require('internal/webstreams/adapters');
readable = newReadableStreamFromStreamBase(
this[kHandle],
undefined,
{ ondone: () => this[kUnref]() });

const {
readableStreamCancel,
} = require('internal/webstreams/readablestream');
this[kRef]();
this.once('close', () => {
readableStreamCancel(readable);
});
} else {
const {
readableStreamCancel,
ReadableStream,
} = require('internal/webstreams/readablestream');

const readFn = FunctionPrototypeBind(this.read, this);
const ondone = FunctionPrototypeBind(this[kUnref], this);

readable = new ReadableStream({
type: 'bytes',
autoAllocateChunkSize: 16384,

async pull(controller) {
const view = controller.byobRequest.view;
const { bytesRead } = await readFn(view, view.byteOffset, view.byteLength);

if (bytesRead === 0) {
ondone();
controller.close();
}

controller.byobRequest.respond(bytesRead);
},

cancel() {
ondone();
},
});

this[kRef]();

this.once('close', () => {
readableStreamCancel(readable);
});
}

return readable;
}
Expand Down
84 changes: 84 additions & 0 deletions test/parallel/test-filehandle-readablestream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,87 @@ const check = readFileSync(__filename, { encoding: 'utf8' });
mc.port1.close();
await file.close();
})().then(common.mustCall());

// Make sure 'bytes' stream works
(async () => {
const file = await open(__filename);
const dec = new TextDecoder();
const readable = file.readableWebStream({ type: 'bytes' });
const reader = readable.getReader({ mode: 'byob' });

let data = '';
let result;
do {
const buff = new ArrayBuffer(100);
result = await reader.read(new DataView(buff));
if (result.value !== undefined) {
data += dec.decode(result.value);
assert.ok(result.value.byteLength <= 100);
}
} while (!result.done);

assert.strictEqual(check, data);

assert.throws(() => file.readableWebStream(), {
code: 'ERR_INVALID_STATE',
});

await file.close();
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream 'bytes' stream
// fails if the FileHandle is already closed.
(async () => {
const file = await open(__filename);
await file.close();

assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure that acquiring a ReadableStream 'bytes' stream
// fails if the FileHandle is already closing.
(async () => {
const file = await open(__filename);
file.close();

assert.throws(() => file.readableWebStream({ type: 'bytes' }), {
code: 'ERR_INVALID_STATE',
});
})().then(common.mustCall());

// Make sure the 'bytes' ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream({ type: 'bytes' });
const reader = readable.getReader({ mode: 'byob' });
file.close();
await reader.closed;
})().then(common.mustCall());

// Make sure the 'bytes' ReadableStream is closed when the underlying
// FileHandle is closed.
(async () => {
const file = await open(__filename);
const readable = file.readableWebStream({ type: 'bytes' });
file.close();
const reader = readable.getReader({ mode: 'byob' });
await reader.closed;
})().then(common.mustCall());

// Make sure that the FileHandle is properly marked "in use"
// when a 'bytes' ReadableStream has been acquired for it.
(async () => {
const file = await open(__filename);
file.readableWebStream({ type: 'bytes' });
const mc = new MessageChannel();
mc.port1.onmessage = common.mustNotCall();
assert.throws(() => mc.port2.postMessage(file, [file]), {
code: 25,
name: 'DataCloneError',
});
mc.port1.close();
await file.close();
})().then(common.mustCall());