Skip to content

Commit

Permalink
Support both versions
Browse files Browse the repository at this point in the history
  • Loading branch information
vweevers committed Jun 18, 2024
1 parent cb4b436 commit 0874891
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 16 deletions.
62 changes: 54 additions & 8 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
const { Readable } = require('readable-stream')

const kIterator = Symbol('iterator')
const kPromises = Symbol('promises')
const kNextv = Symbol('nextv')
const kNextvLegacy = Symbol('nextvLegacy')
const kDestroy = Symbol('destroy')

class LevelReadStream extends Readable {
Expand All @@ -17,8 +19,15 @@ class LevelReadStream extends Readable {

this[kIterator] = db[method](rest)
this[kNextv] = this[kNextv].bind(this)
this[kNextvLegacy] = this[kNextvLegacy].bind(this)
this[kDestroy] = this.destroy.bind(this)

// Detect abstract-level 2 by the presence of hooks. Version 2 doesn't
// support callbacks anymore. Version 1 does also support promises but
// that would be slower because it works by wrapping the callback API.
// TODO: test against memory-level once it has upgraded.
this[kPromises] = db.hooks !== undefined

// NOTE: use autoDestroy option when it lands in readable-stream
this.once('end', this.destroy.bind(this, null, null))
}
Expand All @@ -29,10 +38,28 @@ class LevelReadStream extends Readable {

_read (size) {
if (this.destroyed) return
this[kIterator].nextv(size).then(
this[kNextv],
this[kDestroy]
)

if (this[kPromises]) {
this[kIterator].nextv(size).then(
this[kNextv],
this[kDestroy]
)
} else {
this[kIterator].nextv(size, this[kNextvLegacy])
}
}

[kNextvLegacy] (err, items) {
if (this.destroyed) return
if (err) return this.destroy(err)

if (items.length === 0) {
this.push(null)
} else {
for (const item of items) {
this.push(item)
}
}
}

[kNextv] (items) {
Expand All @@ -48,10 +75,16 @@ class LevelReadStream extends Readable {
}

_destroy (err, callback) {
this[kIterator].close().then(
err ? () => callback(err) : callback,
callback
)
if (this[kPromises]) {
this[kIterator].close().then(
err ? () => callback(err) : callback,
callback
)
} else {
this[kIterator].close(function (err2) {
callback(err || err2)
})
}
}
}

Expand All @@ -60,6 +93,19 @@ class EntryStream extends LevelReadStream {
super(db, 'iterator', { ...options, keys: true, values: true })
}

[kNextvLegacy] (err, entries) {
if (this.destroyed) return
if (err) return this.destroy(err)

if (entries.length === 0) {
this.push(null)
} else {
for (const [key, value] of entries) {
this.push({ key, value })
}
}
}

[kNextv] (entries) {
if (this.destroyed) return

Expand Down
16 changes: 8 additions & 8 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
})
})

test.skip(name + ': destroy() during iterator.nextv', function (t) {
test(name + ': destroy() during iterator.nextv', function (t) {
const stream = new Ctor(db)
const order = monitor(stream, function () {
t.same(order, ['_close', 'close'], 'event order')
Expand All @@ -149,7 +149,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
stream.resume()
})

test.skip(name + ': destroy(err) during iterator.nextv', function (t) {
test(name + ': destroy(err) during iterator.nextv', function (t) {
const stream = new Ctor(db)
const order = monitor(stream, function () {
t.same(order, ['_close', 'error: user', 'close'], 'event order')
Expand All @@ -163,7 +163,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
stream.resume()
})

test.skip(name + ': destroy(err, callback) during iterator.nextv', function (t) {
test(name + ': destroy(err, callback) during iterator.nextv', function (t) {
const stream = new Ctor(db)

const order = monitor(stream, function () {
Expand All @@ -181,7 +181,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
stream.resume()
})

test.skip(name + ': destroy(null, callback) during iterator.nextv', function (t) {
test(name + ': destroy(null, callback) during iterator.nextv', function (t) {
const stream = new Ctor(db)

const order = monitor(stream, function () {
Expand All @@ -199,7 +199,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
stream.resume()
})

test.skip(name + ': destroy during iterator.nextv 1', function (t) {
test(name + ': destroy during iterator.nextv 1', function (t) {
const stream = new Ctor(db)
const iterator = db[kLastIterator]
const nextv = iterator.nextv.bind(iterator)
Expand All @@ -214,7 +214,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
stream.on('close', t.end.bind(t))
})

test.skip(name + ': destroy during iterator.nextv 2', function (t) {
test(name + ': destroy during iterator.nextv 2', function (t) {
const stream = new Ctor(db)
const iterator = db[kLastIterator]
const nextv = iterator.nextv.bind(iterator)
Expand All @@ -232,7 +232,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
stream.on('close', t.end.bind(t))
})

test.skip(name + ': destroy after iterator.nextv 1', function (t) {
test(name + ': destroy after iterator.nextv 1', function (t) {
const stream = new Ctor(db)
const iterator = db[kLastIterator]
const nextv = iterator.nextv.bind(iterator)
Expand All @@ -249,7 +249,7 @@ for (const Ctor of [EntryStream, KeyStream, ValueStream]) {
stream.on('close', t.end.bind(t))
})

test.skip(name + ': destroy after iterator.nextv 2', function (t) {
test(name + ': destroy after iterator.nextv 2', function (t) {
const stream = new Ctor(db)
const iterator = db[kLastIterator]
const nextv = iterator.nextv.bind(iterator)
Expand Down

0 comments on commit 0874891

Please sign in to comment.