From 5f335a527b6549219366fa44f4bea829e7359aaf Mon Sep 17 00:00:00 2001 From: Taylor Ninesling Date: Fri, 5 Apr 2024 10:16:57 -0500 Subject: [PATCH] Handle failed subscription payloads (#7866) Currently, the error handling for subscription events is not well-defined in the GraphQL spec, but that doesn't mean we shouldn't handle them! The existing behavior is that an error thrown from a subscription's generator will go uncaught and crash the whole server. For a transient failure, it may be preferable for consumers that we simply return an error response and continue waiting for more data from the iterator, in case the producer recovers and resumes producing valid data. However, Node's AsyncGenerator terminates once an error is thrown, even if you manually loop calling `iterator.next()`. This change wraps the iterator consumption in a `try/catch` and closes the subscription when an error is encountered. Propagating the error up to the subscriber will allow them to decide if they need to resubscribe or not, in the case of a transient error. --- .changeset/large-ladybugs-breathe.md | 5 ++ .../plugin/subscriptionCallback/index.test.ts | 70 +++++++++++++++++++ .../src/plugin/subscriptionCallback/index.ts | 69 ++++++++++-------- 3 files changed, 114 insertions(+), 30 deletions(-) create mode 100644 .changeset/large-ladybugs-breathe.md diff --git a/.changeset/large-ladybugs-breathe.md b/.changeset/large-ladybugs-breathe.md new file mode 100644 index 00000000000..7eb1d1d965a --- /dev/null +++ b/.changeset/large-ladybugs-breathe.md @@ -0,0 +1,5 @@ +--- +"@apollo/server": patch +--- + +Catch errors thrown by subscription generators, and gracefully clean up the subscription instead of crashing. diff --git a/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts index 65d380c2c11..ea12d27eeff 100644 --- a/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts +++ b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts @@ -1102,6 +1102,62 @@ describe('SubscriptionCallbackPlugin', () => { `); }); + it('sends a `complete` with errors when a subscription throws an error', async () => { + const server = await startSubscriptionServer({ logger }); + + mockRouterCheckResponse(); + mockRouterCheckResponse(); + mockRouterCompleteResponse({ + errors: [{ message: "The subscription generator didn't catch this!" }], + }); + + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql + subscription { + throwsError + } + `, + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, + }, + }, + }), + ); + expect(result.status).toEqual(200); + + jest.advanceTimersByTime(5000); + await server.stop(); + + expect(logger.orderOfOperations).toMatchInlineSnapshot(` + [ + "SubscriptionCallback[1234-cats]: Received new subscription request", + "SubscriptionManager[1234-cats]: Sending \`check\` request to router", + "SubscriptionManager[1234-cats]: \`check\` request successful", + "SubscriptionCallback[1234-cats]: Starting graphql-js subscription", + "SubscriptionCallback[1234-cats]: graphql-js subscription successful", + "SubscriptionManager[1234-cats]: Starting new heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Listening to graphql-js subscription", + "SubscriptionCallback[1234-cats]: Responding to original subscription request", + "ERROR: SubscriptionManager[1234-cats]: Generator threw an error, terminating subscription: The subscription generator didn't catch this!", + "SubscriptionManager[1234-cats]: Sending \`complete\` request to router with errors", + "SubscriptionManager: Sending \`check\` request to http://mock-router-url.com for ID: 1234-cats", + "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", + "SubscriptionManager[1234-cats]: \`complete\` request successful", + "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", + "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager: Heartbeat received response for ID: 1234-cats", + "SubscriptionManager: Heartbeat request successful, ID: 1234-cats", + "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + ] + `); + }); + (process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? describe.skip : describe)( 'error handling', () => { @@ -1979,6 +2035,7 @@ async function startSubscriptionServer( type Subscription { count: Int terminatesSuccessfully: Boolean + throwsError: Int } `, resolvers: { @@ -2011,6 +2068,19 @@ async function startSubscriptionServer( }, }), }, + throwsError: { + subscribe: () => ({ + [Symbol.asyncIterator]() { + return { + next: () => { + throw new Error( + "The subscription generator didn't catch this!", + ); + }, + }; + }, + }), + }, }, }, ...opts, diff --git a/packages/server/src/plugin/subscriptionCallback/index.ts b/packages/server/src/plugin/subscriptionCallback/index.ts index 33258d751e0..159f9cb7c75 100644 --- a/packages/server/src/plugin/subscriptionCallback/index.ts +++ b/packages/server/src/plugin/subscriptionCallback/index.ts @@ -557,39 +557,48 @@ class SubscriptionManager { cancelled: false, async startConsumingSubscription() { self.logger?.debug(`Listening to graphql-js subscription`, id); - for await (const payload of subscription) { - if (this.cancelled) { - self.logger?.debug( - `Subscription already cancelled, ignoring current and future payloads`, - id, - ); - // It's already been cancelled - something else has already handled - // sending the `complete` request so we don't want to `break` here - // and send it again after the loop. - return; - } + try { + for await (const payload of subscription) { + if (this.cancelled) { + self.logger?.debug( + `Subscription already cancelled, ignoring current and future payloads`, + id, + ); + // It's already been cancelled - something else has already handled + // sending the `complete` request so we don't want to `break` here + // and send it again after the loop. + return; + } - try { - await self.retryFetch({ - url: callbackUrl, - action: 'next', - id, - verifier, - payload, - }); - } catch (e) { - const originalError = ensureError(e); - self.logger?.error( - `\`next\` request failed, terminating subscription: ${originalError.message}`, - id, - ); - self.terminateSubscription(id, callbackUrl); + try { + await self.retryFetch({ + url: callbackUrl, + action: 'next', + id, + verifier, + payload, + }); + } catch (e) { + const originalError = ensureError(e); + self.logger?.error( + `\`next\` request failed, terminating subscription: ${originalError.message}`, + id, + ); + self.terminateSubscription(id, callbackUrl); + } } + // The subscription ended without errors, send the `complete` request to + // the router + self.logger?.debug(`Subscription completed without errors`, id); + await this.completeSubscription(); + } catch (e) { + const error = ensureGraphQLError(e); + self.logger?.error( + `Generator threw an error, terminating subscription: ${error.message}`, + id, + ); + this.completeSubscription([error]); } - // The subscription ended without errors, send the `complete` request to - // the router - self.logger?.debug(`Subscription completed without errors`, id); - await this.completeSubscription(); }, async completeSubscription(errors?: readonly GraphQLError[]) { if (this.cancelled) return;