Skip to content

Commit

Permalink
src: introduce PullAll method to speed up blob.text/arrayBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
debadree25 committed Apr 15, 2024
1 parent 0debfa4 commit 8415d63
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 22 deletions.
33 changes: 11 additions & 22 deletions lib/internal/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,28 +276,17 @@ class Blob {

const { promise, resolve, reject } = createDeferredPromise();
const reader = this[kHandle].getReader();
const buffers = [];
const readNext = () => {
reader.pull((status, buffer) => {
if (status === 0) {
// EOS, concat & resolve
// buffer should be undefined here
resolve(concat(buffers));
return;
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
reject(error);
return;
}
if (buffer !== undefined)
buffers.push(buffer);
queueMicrotask(() => readNext());
});
};
readNext();
reader.pullAll((status, buffer) => {
if (status === 0) {
resolve(buffer);
} else if (status < 0) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
const error = lazyDOMException('The blob could not be read', 'NotReadableError');
reject(error);
}
});
return promise;
}

Expand Down
86 changes: 86 additions & 0 deletions src/node_blob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ Local<FunctionTemplate> Blob::Reader::GetConstructorTemplate(Environment* env) {
BaseObject::kInternalFieldCount);
tmpl->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "BlobReader"));
SetProtoMethod(env->isolate(), tmpl, "pull", Pull);
SetProtoMethod(env->isolate(), tmpl, "pullAll", PullAll);
env->set_blob_reader_constructor_template(tmpl);
}
return tmpl;
Expand Down Expand Up @@ -393,6 +394,90 @@ void Blob::Reader::Pull(const FunctionCallbackInfo<Value>& args) {
std::move(next), node::bob::OPTIONS_END, nullptr, 0));
}

void Blob::Reader::PullAll(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Blob::Reader* reader;
ASSIGN_OR_RETURN_UNWRAP(&reader, args.Holder());

CHECK(args[0]->IsFunction());
Local<Function> fn = args[0].As<Function>();
CHECK(!fn->IsConstructor());

if (reader->eos_) {
Local<Value> arg = Int32::New(env->isolate(), bob::STATUS_EOS);
reader->MakeCallback(fn, 1, &arg);
return;
}

struct View {
std::shared_ptr<BackingStore> store;
size_t length;
size_t offset = 0;
};

struct Impl {
BaseObjectPtr<Blob::Reader> reader;
Global<Function> callback;
Environment* env;
std::function<void()> enqueue_cb;
std::vector<View> views;
size_t total_size = 0;
};

std::shared_ptr<Impl> impl_ptr = std::make_shared<Impl>();
impl_ptr->reader = BaseObjectPtr<Blob::Reader>(reader);
impl_ptr->callback.Reset(env->isolate(), fn);
impl_ptr->env = env;

auto next = [impl_ptr](int status, const DataQueue::Vec* vecs, size_t count, bob::Done doneCb) mutable {
Environment* env = impl_ptr->env;
HandleScope handleScope(env->isolate());
Local<Function> fn = impl_ptr->callback.Get(env->isolate());

if (status == bob::STATUS_EOS) impl_ptr->reader->eos_ = true;

if (count > 0) {
size_t total = 0;
for (size_t n = 0; n < count; n++) total += vecs[n].len;

// contructing array buffer requires a shared_ptr
std::shared_ptr<BackingStore> store = v8::ArrayBuffer::NewBackingStore(env->isolate(), total);
auto ptr = static_cast<uint8_t*>(store->Data());
for (size_t n = 0; n < count; n++) {
std::copy(vecs[n].base, vecs[n].base + vecs[n].len, ptr);
ptr += vecs[n].len;
}
doneCb(0);
impl_ptr->views.emplace_back(View{store, total});
impl_ptr->total_size += total;
}

if (status > 0) {
impl_ptr->enqueue_cb();
} else {
std::shared_ptr<BackingStore> store = ArrayBuffer::NewBackingStore(env->isolate(), impl_ptr->total_size);
auto ptr = static_cast<uint8_t*>(store->Data());
auto views = impl_ptr->views;
for (const auto& view : views) {
uint8_t* from = static_cast<uint8_t*>(view.store->Data()) + view.offset;
std::copy(from, from + view.length, ptr);
ptr += view.length;
}
Local<Value> argv[2] = {Int32::New(env->isolate(), status), ArrayBuffer::New(env->isolate(), store)};
impl_ptr->reader->MakeCallback(fn, arraysize(argv), argv);
impl_ptr->reader.reset();
}
};

impl_ptr->enqueue_cb = [next, impl_ptr]() {
if (impl_ptr->reader && impl_ptr->reader->inner_) {
impl_ptr->reader->inner_->Pull(next, node::bob::OPTIONS_END, nullptr, 0);
}
};

impl_ptr->enqueue_cb();
}

BaseObjectPtr<BaseObject>
Blob::BlobTransferData::Deserialize(
Environment* env,
Expand Down Expand Up @@ -574,6 +659,7 @@ void Blob::RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Blob::GetDataObject);
registry->Register(Blob::RevokeObjectURL);
registry->Register(Blob::Reader::Pull);
registry->Register(Blob::Reader::PullAll);
registry->Register(Concat);
registry->Register(BlobFromFilePath);
}
Expand Down
1 change: 1 addition & 0 deletions src/node_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class Blob : public BaseObject {
static BaseObjectPtr<Reader> Create(Environment* env,
BaseObjectPtr<Blob> blob);
static void Pull(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PullAll(const v8::FunctionCallbackInfo<v8::Value>& args);

explicit Reader(Environment* env,
v8::Local<v8::Object> obj,
Expand Down

0 comments on commit 8415d63

Please sign in to comment.