Skip to content

Commit

Permalink
Fix pg-query-stream
Browse files Browse the repository at this point in the history
There were some subtle behaviors with the stream being implemented incorrectly & not working as expected with async iteration.  I've modified the code based on #2050 and comments in #2035 to have better test coverage of async iterables and update the internals significantly to more closely match the readable stream interface.

Note: this is a __breaking__ (semver major) change to this package as the close event behavior is changed slightly, and `highWaterMark` is no longer supported.  It shouldn't impact most usage, but breaking regardless.
  • Loading branch information
brianc committed Dec 28, 2019
1 parent 0b87d49 commit e3cb40c
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 43 deletions.
79 changes: 46 additions & 33 deletions packages/pg-query-stream/index.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
'use strict'
var Cursor = require('pg-cursor')
var Readable = require('stream').Readable
const { Readable } = require('stream')
const Cursor = require('pg-cursor')

class PgQueryStream extends Readable {
constructor (text, values, options) {
super(Object.assign({ objectMode: true }, options))
this.cursor = new Cursor(text, values, options)
constructor(text, values, config = {}) {
const { batchSize = 100 } = config;
// https://nodejs.org/api/stream.html#stream_new_stream_readable_options
super({ objectMode: true, emitClose: true, autoDestroy: true, highWaterMark: batchSize })
this.cursor = new Cursor(text, values, config)

this._reading = false
this._closed = false
this.batchSize = (options || {}).batchSize || 100
this._callbacks = []
this._err = undefined;

// delegate Submittable callbacks to cursor
this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor)
Expand All @@ -19,40 +21,51 @@ class PgQueryStream extends Readable {
this.handleError = this.cursor.handleError.bind(this.cursor)
}

submit (connection) {
submit(connection) {
this.cursor.submit(connection)
}

close (callback) {
this._closed = true
const cb = callback || (() => this.emit('close'))
this.cursor.close(cb)
close(callback) {
if (this.destroyed) {
if (callback) setImmediate(callback)
} else {
if (callback) this.once('close', callback)
this.destroy()
}
}

_close() {
this.cursor.close((err) => {
let cb
while ((cb = this._callbacks.pop())) cb(err || this._err)
})
}

_read (size) {
if (this._reading || this._closed) {
return false
_destroy(_err, callback) {
this._err = _err;
this._callbacks.push(callback)
if (!this._reading) {
this._close()
}
}

// https://nodejs.org/api/stream.html#stream_readable_read_size_1
_read(size) {
// Prevent _destroy() from closing while reading
this._reading = true
const readAmount = Math.max(size, this.batchSize)
this.cursor.read(readAmount, (err, rows) => {
if (this._closed) {
return
}
if (err) {
return this.emit('error', err)
}
// if we get a 0 length array we've read to the end of the cursor
if (!rows.length) {
this._closed = true
setImmediate(() => this.emit('close'))
return this.push(null)
}

// push each row into the stream
this.cursor.read(size, (err, rows, result) => {
this._reading = false
for (var i = 0; i < rows.length; i++) {
this.push(rows[i])

if (this.destroyed) {
// Destroyed while reading?
this._close()
} else if (err) {
// https://nodejs.org/api/stream.html#stream_errors_while_reading
this.destroy(err)
} else {
for (const row of rows) this.push(row)
if (rows.length < size) this.push(null)
}
})
}
Expand Down
55 changes: 55 additions & 0 deletions packages/pg-query-stream/test/async-iterator.es6
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,59 @@ describe('Async iterator', () => {
assert.equal(allRows.length, 603)
await pool.end()
})

it('can break out of iteration early', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
for await (const row of client.query(new QueryStream(queryText, []))) {
rows.push(row)
break;
}
for await (const row of client.query(new QueryStream(queryText, []))) {
rows.push(row)
break;
}
for await (const row of client.query(new QueryStream(queryText, []))) {
rows.push(row)
break;
}
assert.strictEqual(rows.length, 3)
client.release()
await pool.end()
})

it('only returns rows on first iteration', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
const stream = client.query(new QueryStream(queryText, []))
for await (const row of stream) {
rows.push(row)
break;
}
for await (const row of stream) {
rows.push(row)
}
for await (const row of stream) {
rows.push(row)
}
assert.strictEqual(rows.length, 1)
client.release()
await pool.end()
})

it('can read with delays', async () => {
const pool = new pg.Pool({ max: 1 })
const client = await pool.connect()
const rows = []
const stream = client.query(new QueryStream(queryText, [], { batchSize: 1 }))
for await (const row of stream) {
rows.push(row)
await new Promise((resolve) => setTimeout(resolve, 1))
}
assert.strictEqual(rows.length, 201)
client.release()
await pool.end()
})
})
11 changes: 4 additions & 7 deletions packages/pg-query-stream/test/close.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ var helper = require('./helper')

helper('close', function (client) {
it('emits close', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], {batchSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [3], { batchSize: 2, highWaterMark: 2 })
var query = client.query(stream)
query.pipe(concat(function () {}))
query.pipe(concat(function () { }))
query.on('close', done)
})
})

helper('early close', function (client) {
it('can be closed early', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], {batchSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [20000], { batchSize: 2, highWaterMark: 2 })
var query = client.query(stream)
var readCount = 0
query.on('readable', function () {
Expand All @@ -34,7 +34,7 @@ helper('early close', function (client) {

helper('close callback', function (client) {
it('notifies an optional callback when the conneciton is closed', function (done) {
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], {batchSize: 2, highWaterMark: 2})
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [10], { batchSize: 2, highWaterMark: 2 })
var query = client.query(stream)
query.once('readable', function () { // only reading once
query.read()
Expand All @@ -45,8 +45,5 @@ helper('close callback', function (client) {
done()
})
})
query.on('close', function () {
assert(false, 'close event should not fire') // no close event because we did not read to the end of the stream.
})
})
})
4 changes: 1 addition & 3 deletions packages/pg-query-stream/test/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ var assert = require('assert')
var QueryStream = require('../')

var stream = new QueryStream('SELECT NOW()', [], {
highWaterMark: 999,
batchSize: 88
})

assert.equal(stream._readableState.highWaterMark, 999)
assert.equal(stream.batchSize, 88)
assert.equal(stream._readableState.highWaterMark, 88)

0 comments on commit e3cb40c

Please sign in to comment.