Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3.3.0 regression with multiple listeners #490

Closed
eugene1g opened this issue Oct 2, 2022 · 3 comments
Closed

v3.3.0 regression with multiple listeners #490

eugene1g opened this issue Oct 2, 2022 · 3 comments

Comments

@eugene1g
Copy link
Contributor

eugene1g commented Oct 2, 2022

Hi,

Starting v3.3.0, if I have multiple listeners bound to the same event, and the DB connection is interrupted, only the latest listener will successfully reconnect & get triggered for new notify events. Additionally, the listener.state value is missing for all listeners after the first one. Here's a minimal reproduction that works as expected with v3.2.4 but breaks from v3.3.0

import postgres from "postgres";
import { setTimeout } from "node:timers/promises";

const sql = postgres();

const listener1 = await sql.listen(
  "jobs",
  (x) => console.log(`listener1 got payload: "${x}"`),
  () => console.log(`listener1 onlisten()`)
);

const listener2 = await sql.listen(
  "jobs",
  (x) => console.log(`listener2 got payload: "${x}"`),
  () => console.log(`listener2 onlisten()`)
);
const listener3 = await sql.listen(
  "jobs",
  (x) => console.log(`listener3 got payload: "${x}"`),
  () => console.log(`listener3 onlisten()`)
);

console.log("notify j1", { l1: listener1.state, l2: listener2.state, l3: listener3.state });
sql.notify("jobs", "j1");
await setTimeout(100);

// Terminate one of the listeners
console.log("terminating listener1");
await sql`select pg_terminate_backend(${listener1.state.pid})`;
await setTimeout(100); // give time to reconnect

// Trigger another event
console.log("notify j2", { l1: listener1.state, l2: listener2.state, l3: listener3.state });
sql.notify("jobs", "j2");

await setTimeout(800);
await sql.end();
@eugene1g
Copy link
Contributor Author

eugene1g commented Oct 2, 2022

This is the problematic change: edfa360#diff-bfe9874d239014961b1ae4e89875a6155667db834a410aaaa2ebe3cf89820556R172-R173

The cause is that when multiple listeners exist, the code will establish multiple connections in succession, all will be override the value in channels[name], so only the latest connection/listener remains active.

A naive fix would be to change that logic to this -

    let result = await sql`listen ${ sql(name) }`
    if(channels[name]){
      // another listener could have established the channel while we were waiting
      channels[name].listeners.push(listener)
      result=channels[name].result
    } else {
      channels[name] = {result, listeners: [listener]}
    }

However, that's not ideal because we probably need to clean up the multiple listend subscription that are created.

Here's a failing test you can add to src/tests/index.js -

t('multiple listeners work after a reconnect', async() => {
  const sql = postgres(options)
      , xs = []

  const s1 = await sql.listen('test', x => xs.push('1', x))
  const s2 = await sql.listen('test', x => xs.push('2', x))
  await sql.notify('test', 'a')
  await delay(50)
  await sql`select pg_terminate_backend(${ s1.state.pid })`
  await delay(50)
  await sql.notify('test', 'b')
  await delay(50)
  sql.end()

  return ['1a2a1b2b', xs.join('')]
})

@porsager
Copy link
Owner

porsager commented Oct 2, 2022

Thanks a lot @eugene1g !! I'll get that fixed as soon as I'm back at my comp!

@porsager
Copy link
Owner

porsager commented Oct 3, 2022

Released in v3.3.1

porsager added a commit that referenced this issue Feb 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants