From f73b22f96e8f5c80d920689e65ac3dcd7c19e1bc Mon Sep 17 00:00:00 2001 From: Brian C Date: Tue, 17 Sep 2024 09:50:17 -0500 Subject: [PATCH] Handle bad message ordering - make it catchable. Fixes 3174 (#3289) * Handle bad message ordering - make it catchable. Fixes 3174 * Close client in test * Mess w/ github action settings * update ci config * Remove redundant tests * Update code to use handle error event * Add tests for commandComplete message being out of order * Lint fix * Fix native tests * Fix lint again...airport computer not my friend * Not a native issue --- .github/workflows/ci.yml | 7 +- packages/pg-native/test/many-connections.js | 13 +- packages/pg/lib/client.js | 12 +- packages/pg/script/create-test-tables.js | 57 +++--- .../test/integration/gh-issues/3174-tests.js | 167 ++++++++++++++++++ 5 files changed, 203 insertions(+), 53 deletions(-) create mode 100644 packages/pg/test/integration/gh-issues/3174-tests.js diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4ad39305b..f6e93d71e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: - run: yarn install --frozen-lockfile - run: yarn lint build: - timeout-minutes: 10 + timeout-minutes: 15 needs: lint services: postgres: @@ -44,8 +44,8 @@ jobs: - '22' os: - ubuntu-latest - name: Node.js ${{ matrix.node }} (${{ matrix.os }}) - runs-on: ${{ matrix.os }} + name: Node.js ${{ matrix.node }} + runs-on: ubuntu-latest env: PGUSER: postgres PGPASSWORD: postgres @@ -71,5 +71,4 @@ jobs: node-version: ${{ matrix.node }} cache: yarn - run: yarn install --frozen-lockfile - # TODO(bmc): get ssl tests working in ci - run: yarn test diff --git a/packages/pg-native/test/many-connections.js b/packages/pg-native/test/many-connections.js index 204199666..fe32ede16 100644 --- a/packages/pg-native/test/many-connections.js +++ b/packages/pg-native/test/many-connections.js @@ -6,7 +6,7 @@ var bytes = require('crypto').pseudoRandomBytes describe('many connections', function () { describe('async', function () { var test = function (count, times) { - it('connecting ' + count + ' clients ' + times, function (done) { + it(`connecting ${count} clients ${times} times`, function (done) { this.timeout(200000) var connectClient = function (n, cb) { @@ -38,20 +38,9 @@ describe('many connections', function () { } test(1, 1) - test(1, 1) - test(1, 1) - test(5, 5) test(5, 5) - test(5, 5) - test(5, 5) - test(10, 10) test(10, 10) - test(10, 10) - test(20, 20) - test(20, 20) test(20, 20) test(30, 10) - test(30, 10) - test(30, 10) }) }) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index e4720114e..527f62e4f 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -377,11 +377,21 @@ class Client extends EventEmitter { } _handleCommandComplete(msg) { + if (this.activeQuery == null) { + const error = new Error('Received unexpected commandComplete message from backend.') + this._handleErrorEvent(error) + return + } // delegate commandComplete to active query this.activeQuery.handleCommandComplete(msg, this.connection) } - _handleParseComplete(msg) { + _handleParseComplete() { + if (this.activeQuery == null) { + const error = new Error('Received unexpected parseComplete message from backend.') + this._handleErrorEvent(error) + return + } // if a prepared statement has a name and properly parses // we track that its already been executed so we don't parse // it again on the same client diff --git a/packages/pg/script/create-test-tables.js b/packages/pg/script/create-test-tables.js index c4ec99f23..622b0cd20 100644 --- a/packages/pg/script/create-test-tables.js +++ b/packages/pg/script/create-test-tables.js @@ -31,41 +31,26 @@ var people = [ { name: 'Zanzabar', age: 260 }, ] -var con = new pg.Client({ - user: args.user, - password: args.password, - host: args.host, - port: args.port, - database: args.database, -}) - -con.connect((err) => { - if (err) { - throw err - } - - con.query( - 'DROP TABLE IF EXISTS person;' + ' CREATE TABLE person (id serial, name varchar(10), age integer)', - (err) => { - if (err) { - throw err - } - - console.log('Created table person') - console.log('Filling it with people') - - con.query( - 'INSERT INTO person (name, age) VALUES' + - people.map((person) => ` ('${person.name}', ${person.age})`).join(','), - (err, result) => { - if (err) { - throw err - } - - console.log(`Inserted ${result.rowCount} people`) - con.end() - } - ) - } +async function run() { + var con = new pg.Client({ + user: args.user, + password: args.password, + host: args.host, + port: args.port, + database: args.database, + }) + console.log('creating test dataset') + await con.connect() + await con.query('DROP TABLE IF EXISTS person') + await con.query('CREATE TABLE person (id serial, name varchar(10), age integer)') + await con.query( + 'INSERT INTO person (name, age) VALUES' + people.map((person) => ` ('${person.name}', ${person.age})`).join(',') ) + await con.end() + console.log('created test dataset') +} + +run().catch((e) => { + console.log('setup failed', e) + process.exit(255) }) diff --git a/packages/pg/test/integration/gh-issues/3174-tests.js b/packages/pg/test/integration/gh-issues/3174-tests.js new file mode 100644 index 000000000..49ac5905a --- /dev/null +++ b/packages/pg/test/integration/gh-issues/3174-tests.js @@ -0,0 +1,167 @@ +const net = require('net') +const buffers = require('../../test-buffers') +const helper = require('../test-helper') +const assert = require('assert') +const cli = require('../../cli') + +const suite = new helper.Suite() + +const options = { + host: 'localhost', + port: Math.floor(Math.random() * 2000) + 2000, + connectionTimeoutMillis: 2000, + user: 'not', + database: 'existing', +} + +const startMockServer = (port, badBuffer, callback) => { + const sockets = new Set() + + const server = net.createServer((socket) => { + sockets.add(socket) + socket.once('end', () => sockets.delete(socket)) + + socket.on('data', (data) => { + // deny request for SSL + if (data.length === 8) { + socket.write(Buffer.from('N', 'utf8')) + return + // consider all authentication requests as good + } + // the initial message coming in has a 0 message type for authentication negotiation + if (!data[0]) { + socket.write(buffers.authenticationOk()) + // send ReadyForQuery `timeout` ms after authentication + socket.write(buffers.readyForQuery()) + return + // respond with our canned response + } + const code = data.toString('utf8', 0, 1) + switch (code) { + // parse + case 'P': + socket.write(buffers.parseComplete()) + socket.write(buffers.bindComplete()) + socket.write(buffers.rowDescription()) + socket.write(buffers.dataRow()) + socket.write(buffers.commandComplete('FOO BAR')) + socket.write(buffers.readyForQuery()) + // this message is invalid, but sometimes sent out of order when using proxies or pg-bouncer + setImmediate(() => { + socket.write(badBuffer) + }) + break + case 'Q': + socket.write(buffers.rowDescription()) + socket.write(buffers.dataRow()) + socket.write(buffers.commandComplete('FOO BAR')) + socket.write(buffers.readyForQuery()) + // this message is invalid, but sometimes sent out of order when using proxies or pg-bouncer + setImmediate(() => { + socket.write(badBuffer) + }) + default: + // console.log('got code', code) + } + }) + }) + + const closeServer = () => { + for (const socket of sockets) { + socket.destroy() + } + return new Promise((resolve) => { + server.close(resolve) + }) + } + + server.listen(port, options.host, () => callback(closeServer)) +} + +const delay = (ms) => + new Promise((resolve) => { + setTimeout(resolve, ms) + }) + +const testErrorBuffer = (bufferName, errorBuffer) => { + suite.testAsync(`Out of order ${bufferName} on simple query is catchable`, async () => { + const closeServer = await new Promise((resolve, reject) => { + return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer)) + }) + const client = new helper.Client(options) + await client.connect() + + let errorHit = false + client.on('error', () => { + errorHit = true + }) + + await client.query('SELECT NOW()') + await delay(50) + + // the native client only emits a notice message and keeps on its merry way + if (!cli.native) { + assert(errorHit) + // further queries on the client should fail since its in an invalid state + await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject') + } + + await closeServer() + }) + + suite.testAsync(`Out of order ${bufferName} on extended query is catchable`, async () => { + const closeServer = await new Promise((resolve, reject) => { + return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer)) + }) + const client = new helper.Client(options) + await client.connect() + + let errorHit = false + client.on('error', () => { + errorHit = true + }) + + await client.query('SELECT $1', ['foo']) + await delay(40) + + // the native client only emits a notice message and keeps on its merry way + if (!cli.native) { + assert(errorHit) + // further queries on the client should fail since its in an invalid state + await assert.rejects(() => client.query('SELECTR NOW()'), 'Further queries on the client should reject') + } + + await client.end() + + await closeServer() + }) + + suite.testAsync(`Out of order ${bufferName} on pool is catchable`, async () => { + const closeServer = await new Promise((resolve, reject) => { + return startMockServer(options.port, errorBuffer, (closeServer) => resolve(closeServer)) + }) + const pool = new helper.pg.Pool(options) + + let errorHit = false + pool.on('error', () => { + errorHit = true + }) + + await pool.query('SELECT $1', ['foo']) + await delay(100) + + if (!cli.native) { + assert(errorHit) + assert.strictEqual(pool.idleCount, 0, 'Pool should have no idle clients') + assert.strictEqual(pool.totalCount, 0, 'Pool should have no connected clients') + } + + await pool.end() + await closeServer() + }) +} + +if (!helper.args.native) { + testErrorBuffer('parseComplete', buffers.parseComplete()) + testErrorBuffer('commandComplete', buffers.commandComplete('f')) +}