diff --git a/.changeset/bright-needles-suffer.md b/.changeset/bright-needles-suffer.md new file mode 100644 index 00000000000..a9308999882 --- /dev/null +++ b/.changeset/bright-needles-suffer.md @@ -0,0 +1,5 @@ +--- +"@apollo/server": patch +--- + +In the subscription callback server plugin, terminating a subscription now immediately closes the internal async generator. This avoids that generator existing after termination and until the next message is received. diff --git a/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts index a622c61770a..65d380c2c11 100644 --- a/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts +++ b/packages/server/src/__tests__/plugin/subscriptionCallback/index.test.ts @@ -185,6 +185,7 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + "SubscriptionManager[1234-cats]: Subscription completed without errors", ] `); }); @@ -289,6 +290,7 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionManager[1234-cats]: \`complete\` request successful", "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + "SubscriptionManager[1234-cats]: Subscription completed without errors", ] `); }); @@ -405,6 +407,7 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + "SubscriptionManager[1234-cats]: Subscription completed without errors", ] `); }); @@ -522,6 +525,7 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + "SubscriptionManager[1234-cats]: Subscription completed without errors", ] `); }); @@ -684,10 +688,12 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionManager[1234-cats]: \`complete\` request successful", "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Subscription completed without errors", "SubscriptionManager[5678-dogs]: \`complete\` request successful", "SubscriptionManager: Terminating subscriptions for ID: 5678-dogs", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url-2.com", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", + "SubscriptionManager[5678-dogs]: Subscription completed without errors", ] `); }); @@ -879,118 +885,120 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionManager: Heartbeat request received invalid ID: 1234-cats", "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Subscription completed without errors", "SubscriptionManager: Heartbeat received response for ID: 5678-dogs", "SubscriptionManager: Heartbeat request received invalid ID: 5678-dogs", "SubscriptionManager: Terminating subscriptions for ID: 5678-dogs", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com/5678-dogs", + "SubscriptionManager[5678-dogs]: Subscription completed without errors", "TESTING: Triggering third update", - "SubscriptionManager[1234-cats]: Subscription already cancelled, ignoring current and future payloads", - "SubscriptionManager[5678-dogs]: Subscription already cancelled, ignoring current and future payloads", "SubscriptionCallback: Server is shutting down. Cleaning up outstanding subscriptions and heartbeat intervals", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); }); - it('handles router termination via 404', async () => { - const server = await startSubscriptionServer({ logger }); + (process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? it.skip : it)( + 'handles router termination via 404', + async () => { + const server = await startSubscriptionServer({ logger }); - mockRouterCheckResponse(); - // Mock the heartbeat response from the router. We'll trigger it once below - // after the subscription is initialized to make sure it works. - const heartbeat = mockRouterCheckResponse(); + mockRouterCheckResponse(); + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const heartbeat = mockRouterCheckResponse(); - // Start the subscriptions; this triggers the initial check request and - // starts the heartbeat interval. This simulates an incoming subscription - // request from the router. - const result = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Start the subscriptions; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { count } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); + }), + ); - expect(result.status).toEqual(200); + expect(result.status).toEqual(200); - // Advance timers to trigger the heartbeat. This consumes the one - // heartbeat mock from above. - jest.advanceTimersByTime(5000); - await heartbeat; + // Advance timers to trigger the heartbeat. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await heartbeat; - // Next we'll trigger some subscription events. In advance, we'll mock the - // router responses. - const firstUpdate = mockRouterNextResponse({ payload: { count: 1 } }); - const secondUpdate = mockRouterNextResponse({ payload: { count: 2 } }); + // Next we'll trigger some subscription events. In advance, we'll mock the + // router responses. + const firstUpdate = mockRouterNextResponse({ payload: { count: 1 } }); + const secondUpdate = mockRouterNextResponse({ payload: { count: 2 } }); - // Trigger a couple updates. These send `next` requests to the router. - logger.debug('TESTING: Triggering first update'); - await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql mutation { addOne } `, - }, - }), - ); + }, + }), + ); - await firstUpdate; + await firstUpdate; - logger.debug('TESTING: Triggering second update'); - await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + logger.debug('TESTING: Triggering second update'); + await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql mutation { addOne } `, - }, - }), - ); + }, + }), + ); - await secondUpdate; + await secondUpdate; - // We've established two subscriptions are functional at this point. Now - // let's have the router invalidate them with a 404 response to the `next`. - const terminationResponse = mockRouterNextResponse({ - statusCode: 404, - payload: { count: 3 }, - }); + // We've established two subscriptions are functional at this point. Now + // let's have the router invalidate them with a 404 response to the `next`. + const terminationResponse = mockRouterNextResponse({ + statusCode: 404, + payload: { count: 3 }, + }); - // Trigger a 3rd update to make sure both subscriptions are cancelled. - logger.debug('TESTING: Triggering third (terminating) update'); - await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Trigger a 3rd update to make sure both subscriptions are cancelled. + logger.debug('TESTING: Triggering third (terminating) update'); + await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql mutation { addOne } `, - }, - }), - ); + }, + }), + ); - await terminationResponse; + await terminationResponse; - await server.stop(); + await server.stop(); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", @@ -1018,7 +1026,8 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); - }); + }, + ); it('sends a `complete` when a subscription terminates successfully', async () => { const server = await startSubscriptionServer({ logger }); @@ -1093,39 +1102,41 @@ describe('SubscriptionCallbackPlugin', () => { `); }); - describe('error handling', () => { - it('encounters errors on initial `check`', async () => { - const server = await startSubscriptionServer({ logger }); + (process.env.INCREMENTAL_DELIVERY_TESTS_ENABLED ? describe.skip : describe)( + 'error handling', + () => { + it('encounters errors on initial `check`', async () => { + const server = await startSubscriptionServer({ logger }); - // Mock the failed check response from the router. - mockRouterCheckResponse({ - statusCode: 400, - responseBody: 'Invalid subscription ID provided', - }); + // Mock the failed check response from the router. + mockRouterCheckResponse({ + statusCode: 400, + responseBody: 'Invalid subscription ID provided', + }); - // This triggers the check request which will fail. - const result = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // This triggers the check request which will fail. + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { count } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); + }), + ); - expect(result.status).toEqual(500); - assert(result.body.kind === 'complete'); - expect(JSON.parse(result.body.string)).toMatchInlineSnapshot(` + expect(result.status).toEqual(500); + assert(result.body.kind === 'complete'); + expect(JSON.parse(result.body.string)).toMatchInlineSnapshot(` { "data": null, "errors": [ @@ -1136,14 +1147,14 @@ describe('SubscriptionCallbackPlugin', () => { } `); - // Trigger the heartbeat interval just to make sure it doesn't actually - // happen in this case (we haven't mocked it, so if it fires it will - // trigger an error and fail the test). - jest.advanceTimersByTime(5000); + // Trigger the heartbeat interval just to make sure it doesn't actually + // happen in this case (we haven't mocked it, so if it fires it will + // trigger an error and fail the test). + jest.advanceTimersByTime(5000); - await server.stop(); + await server.stop(); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", @@ -1155,66 +1166,66 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); - }); + }); - it('encounters errors on subscription', async () => { - const server = await startSubscriptionServer({ logger }); + it('encounters errors on subscription', async () => { + const server = await startSubscriptionServer({ logger }); - // Mock the initial check response. - mockRouterCheckResponse(); + // Mock the initial check response. + mockRouterCheckResponse(); - const completeRequest = mockRouterCompleteResponse({ - errors: [ - { - message: - 'The subscription field "invalidSubscriptionField" is not defined.', - locations: [{ line: 3, column: 15 }], - }, - ], - }); + const completeRequest = mockRouterCompleteResponse({ + errors: [ + { + message: + 'The subscription field "invalidSubscriptionField" is not defined.', + locations: [{ line: 3, column: 15 }], + }, + ], + }); - // Trigger an invalid subscription - const response = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Trigger an invalid subscription + const response = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { invalidSubscriptionField } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); - expect(response.status).toEqual(400); - assert(response.body.kind === 'complete'); - expect(JSON.parse(response.body.string)).toEqual({ - errors: [ - { - message: - 'Cannot query field "invalidSubscriptionField" on type "Subscription".', - locations: [{ line: 3, column: 15 }], - extensions: { - code: 'GRAPHQL_VALIDATION_FAILED', + }), + ); + expect(response.status).toEqual(400); + assert(response.body.kind === 'complete'); + expect(JSON.parse(response.body.string)).toEqual({ + errors: [ + { + message: + 'Cannot query field "invalidSubscriptionField" on type "Subscription".', + locations: [{ line: 3, column: 15 }], + extensions: { + code: 'GRAPHQL_VALIDATION_FAILED', + }, }, - }, - ], - }); + ], + }); - // Trigger the heartbeat interval just to make sure it doesn't actually - // happen in this case (we haven't mocked it, so it'll throw an error if it - // sends a heartbeat). - jest.advanceTimersByTime(5000); + // Trigger the heartbeat interval just to make sure it doesn't actually + // happen in this case (we haven't mocked it, so it'll throw an error if it + // sends a heartbeat). + jest.advanceTimersByTime(5000); - await completeRequest; - await server.stop(); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + await completeRequest; + await server.stop(); + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionManager[1234-cats]: Sending \`check\` request to router", "SubscriptionManager[1234-cats]: \`check\` request successful", @@ -1229,59 +1240,59 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); - }); + }); - it('handles failed heartbeats', async () => { - const server = await startSubscriptionServer({ logger }); + it('handles failed heartbeats', async () => { + const server = await startSubscriptionServer({ logger }); - // Mock the initial check response from the router. - mockRouterCheckResponse(); + // Mock the initial check response from the router. + mockRouterCheckResponse(); - // Start the subscription; this triggers the initial check request and - // starts the heartbeat interval. This simulates an incoming subscription - // request from the router. - const result = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { count } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); + }), + ); - expect(result.status).toEqual(200); + expect(result.status).toEqual(200); - // 5 failures is the limit before the heartbeat is cancelled. We expect to - // see 5 errors and then a final error indicating the heartbeat was - // cancelled in the log snapshot below. - for (let i = 0; i < 5; i++) { - // mock heartbeat response failure - nock('http://mock-router-url.com') - .matchHeader('content-type', 'application/json') - .post('/', { - kind: 'subscription', - action: 'check', - id: '1234-cats', - verifier: 'my-verifier-token', - }) - .replyWithError('network request error'); - // trigger heartbeat - jest.advanceTimersByTime(5000); - } + // 5 failures is the limit before the heartbeat is cancelled. We expect to + // see 5 errors and then a final error indicating the heartbeat was + // cancelled in the log snapshot below. + for (let i = 0; i < 5; i++) { + // mock heartbeat response failure + nock('http://mock-router-url.com') + .matchHeader('content-type', 'application/json') + .post('/', { + kind: 'subscription', + action: 'check', + id: '1234-cats', + verifier: 'my-verifier-token', + }) + .replyWithError('network request error'); + // trigger heartbeat + jest.advanceTimersByTime(5000); + } - await server.stop(); + await server.stop(); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", @@ -1305,62 +1316,63 @@ describe('SubscriptionCallbackPlugin', () => { "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: request to http://mock-router-url.com/ failed, reason: network request error", "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Subscription completed without errors", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); - }); + }); - it('handles failed heartbeats with unexpected status codes', async () => { - const server = await startSubscriptionServer({ logger }); + it('handles failed heartbeats with unexpected status codes', async () => { + const server = await startSubscriptionServer({ logger }); - // Mock the initial check response from the router. - mockRouterCheckResponse(); + // Mock the initial check response from the router. + mockRouterCheckResponse(); - // Start the subscription; this triggers the initial check request and - // starts the heartbeat interval. This simulates an incoming subscription - // request from the router. - const result = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { count } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); + }), + ); - expect(result.status).toEqual(200); + expect(result.status).toEqual(200); - // 5 failures is the limit before the heartbeat is cancelled. We expect to - // see 5 errors and then a final error indicating the heartbeat was - // cancelled in the log snapshot below. - for (let i = 0; i < 5; i++) { - // mock heartbeat response failure - nock('http://mock-router-url.com') - .matchHeader('content-type', 'application/json') - .post('/', { - kind: 'subscription', - action: 'check', - id: '1234-cats', - verifier: 'my-verifier-token', - }) - .reply(500); - // trigger heartbeat - jest.advanceTimersByTime(5000); - } + // 5 failures is the limit before the heartbeat is cancelled. We expect to + // see 5 errors and then a final error indicating the heartbeat was + // cancelled in the log snapshot below. + for (let i = 0; i < 5; i++) { + // mock heartbeat response failure + nock('http://mock-router-url.com') + .matchHeader('content-type', 'application/json') + .post('/', { + kind: 'subscription', + action: 'check', + id: '1234-cats', + verifier: 'my-verifier-token', + }) + .reply(500); + // trigger heartbeat + jest.advanceTimersByTime(5000); + } - await server.stop(); + await server.stop(); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", @@ -1389,85 +1401,86 @@ describe('SubscriptionCallbackPlugin', () => { "ERROR: SubscriptionManager[1234-cats]: Heartbeat request failed 5 times, terminating subscriptions and heartbeat interval: Unexpected status code: 500", "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Subscription completed without errors", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); - }); - - describe('retries', () => { - it('failed `check` requests', async () => { - const server = await startSubscriptionServer({ logger }); - - // Mock the initial check response from the router. We'll fail a couple - // first to test the retry logic. - mockRouterCheckResponseWithError(); - mockRouterCheckResponseWithError(); - mockRouterCheckResponse(); + }); - // Start the subscription; this triggers the initial check request and - // starts the heartbeat interval. This simulates an incoming subscription - // request from the router. - const result = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + describe('retries', () => { + it('failed `check` requests', async () => { + const server = await startSubscriptionServer({ logger }); + + // Mock the initial check response from the router. We'll fail a couple + // first to test the retry logic. + mockRouterCheckResponseWithError(); + mockRouterCheckResponseWithError(); + mockRouterCheckResponse(); + + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { count } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); - - expect(result.status).toEqual(200); - - // Mock the heartbeat response from the router. We'll trigger it once below - // after the subscription is initialized to make sure it works. - const firstHeartbeat = mockRouterCheckResponse(); - // Advance timers to trigger the heartbeat once. This consumes the one - // heartbeat mock from above. - jest.advanceTimersByTime(5000); - await firstHeartbeat; - - // Mock the update from the router, we'll trigger it below - const update = mockRouterNextResponse({ payload: { count: 1 } }); - - // Trigger a couple updates. These send `next` requests to the router. - logger.debug('TESTING: Triggering first update'); - await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + }), + ); + + expect(result.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterCheckResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Mock the update from the router, we'll trigger it below + const update = mockRouterNextResponse({ payload: { count: 1 } }); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql mutation { addOne } `, - }, - }), - ); + }, + }), + ); - await update; + await update; - // When we shutdown the server, we'll stop listening for subscription - // updates, await unresolved requests, and send a `complete` request to the - // router for each active subscription. - const completeRequest = mockRouterCompleteResponse(); + // When we shutdown the server, we'll stop listening for subscription + // updates, await unresolved requests, and send a `complete` request to the + // router for each active subscription. + const completeRequest = mockRouterCompleteResponse(); - await server.stop(); - await completeRequest; + await server.stop(); + await completeRequest; - // The heartbeat should be cleaned up at this point. There is no second - // heartbeat mock, so if it ticks again it'll throw an error. - jest.advanceTimersByTime(5000); + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", @@ -1490,106 +1503,107 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionManager[1234-cats]: \`complete\` request successful", "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Subscription completed without errors", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); - }); + }); - it('failed `next` requests', async () => { - const server = await startSubscriptionServer({ logger }); + it('failed `next` requests', async () => { + const server = await startSubscriptionServer({ logger }); - // Mock the initial check response from the router. - mockRouterCheckResponse(); + // Mock the initial check response from the router. + mockRouterCheckResponse(); - // Start the subscription; this triggers the initial check request and - // starts the heartbeat interval. This simulates an incoming subscription - // request from the router. - const result = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { count } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); - - expect(result.status).toEqual(200); - - // Mock the heartbeat response from the router. We'll trigger it once below - // after the subscription is initialized to make sure it works. - const firstHeartbeat = mockRouterCheckResponse(); - // Advance timers to trigger the heartbeat once. This consumes the one - // heartbeat mock from above. - jest.advanceTimersByTime(5000); - await firstHeartbeat; - - // Next we'll trigger some subscription events. In advance, we'll mock the - // router responses. These responses will fail the first 3 times and - // succeed on the 4th. The retry logic is expected to handle this - // gracefully. - const updates = Promise.all([ - mockRouterNextResponse({ payload: { count: 1 }, statusCode: 500 }), - mockRouterNextResponse({ payload: { count: 1 }, statusCode: 500 }), - mockRouterNextResponse({ payload: { count: 1 }, statusCode: 500 }), - mockRouterNextResponse({ payload: { count: 1 } }), - mockRouterNextResponse({ payload: { count: 2 }, statusCode: 500 }), - mockRouterNextResponse({ payload: { count: 2 }, statusCode: 500 }), - mockRouterNextResponse({ payload: { count: 2 }, statusCode: 500 }), - mockRouterNextResponse({ payload: { count: 2 } }), - ]); - - // Trigger a couple updates. These send `next` requests to the router. - logger.debug('TESTING: Triggering first update'); - await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + }), + ); + + expect(result.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterCheckResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Next we'll trigger some subscription events. In advance, we'll mock the + // router responses. These responses will fail the first 3 times and + // succeed on the 4th. The retry logic is expected to handle this + // gracefully. + const updates = Promise.all([ + mockRouterNextResponse({ payload: { count: 1 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 1 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 1 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 1 } }), + mockRouterNextResponse({ payload: { count: 2 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 2 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 2 }, statusCode: 500 }), + mockRouterNextResponse({ payload: { count: 2 } }), + ]); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql mutation { addOne } `, - }, - }), - ); + }, + }), + ); - logger.debug('TESTING: Triggering second update'); - await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + logger.debug('TESTING: Triggering second update'); + await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql mutation { addOne } `, - }, - }), - ); + }, + }), + ); - await updates; + await updates; - // When we shutdown the server, we'll stop listening for subscription - // updates, await unresolved requests, and send a `complete` request to the - // router for each active subscription. - const completeRequest = mockRouterCompleteResponse(); + // When we shutdown the server, we'll stop listening for subscription + // updates, await unresolved requests, and send a `complete` request to the + // router for each active subscription. + const completeRequest = mockRouterCompleteResponse(); - await server.stop(); - await completeRequest; + await server.stop(); + await completeRequest; - // The heartbeat should be cleaned up at this point. There is no second - // heartbeat mock, so if it ticks again it'll throw an error. - jest.advanceTimersByTime(5000); + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", @@ -1619,84 +1633,85 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionManager[1234-cats]: \`complete\` request successful", "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Subscription completed without errors", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); - }); + }); - it('failed `complete` requests', async () => { - const server = await startSubscriptionServer({ logger }); + it('failed `complete` requests', async () => { + const server = await startSubscriptionServer({ logger }); - // Mock the initial check response from the router. - mockRouterCheckResponse(); + // Mock the initial check response from the router. + mockRouterCheckResponse(); - // Start the subscription; this triggers the initial check request and - // starts the heartbeat interval. This simulates an incoming subscription - // request from the router. - const result = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { count } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); - - expect(result.status).toEqual(200); - - // Mock the heartbeat response from the router. We'll trigger it once below - // after the subscription is initialized to make sure it works. - const firstHeartbeat = mockRouterCheckResponse(); - // Advance timers to trigger the heartbeat once. This consumes the one - // heartbeat mock from above. - jest.advanceTimersByTime(5000); - await firstHeartbeat; - - // Mock the response to the upcoming subscription update. - const update = mockRouterNextResponse({ payload: { count: 1 } }); - - // Trigger a couple updates. These send `next` requests to the router. - logger.debug('TESTING: Triggering first update'); - await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + }), + ); + + expect(result.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterCheckResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Mock the response to the upcoming subscription update. + const update = mockRouterNextResponse({ payload: { count: 1 } }); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql mutation { addOne } `, - }, - }), - ); + }, + }), + ); - await update; + await update; - // The server will send a `complete` request when it shuts down. Here we - // test that the retry logic works for sending `complete` requests. - const completeRetries = Promise.all([ - mockRouterCompleteResponse({ statusCode: 500 }), - mockRouterCompleteResponse({ statusCode: 500 }), - mockRouterCompleteResponse(), - ]); + // The server will send a `complete` request when it shuts down. Here we + // test that the retry logic works for sending `complete` requests. + const completeRetries = Promise.all([ + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse(), + ]); - await server.stop(); - await completeRetries; + await server.stop(); + await completeRetries; - // The heartbeat should be cleaned up at this point. There is no second - // heartbeat mock, so if it ticks again it'll throw an error. - jest.advanceTimersByTime(5000); + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", @@ -1719,86 +1734,87 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionManager[1234-cats]: \`complete\` request successful", "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Subscription completed without errors", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); - }); + }); - it('`complete` requests to failure', async () => { - const server = await startSubscriptionServer({ logger }); + it('`complete` requests to failure', async () => { + const server = await startSubscriptionServer({ logger }); - // Mock the initial check response from the router. - mockRouterCheckResponse(); + // Mock the initial check response from the router. + mockRouterCheckResponse(); - // Start the subscription; this triggers the initial check request and - // starts the heartbeat interval. This simulates an incoming subscription - // request from the router. - const result = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { count } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); - - expect(result.status).toEqual(200); - - // Mock the heartbeat response from the router. We'll trigger it once below - // after the subscription is initialized to make sure it works. - const firstHeartbeat = mockRouterCheckResponse(); - // Advance timers to trigger the heartbeat once. This consumes the one - // heartbeat mock from above. - jest.advanceTimersByTime(5000); - await firstHeartbeat; - - // Mock the response to the upcoming subscription update. - const update = mockRouterNextResponse({ payload: { count: 1 } }); - - // Trigger a couple updates. These send `next` requests to the router. - logger.debug('TESTING: Triggering first update'); - await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + }), + ); + + expect(result.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterCheckResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // Mock the response to the upcoming subscription update. + const update = mockRouterNextResponse({ payload: { count: 1 } }); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql mutation { addOne } `, - }, - }), - ); + }, + }), + ); - await update; + await update; - // The server will send a `complete` request when it shuts down. Here we - // test that the server will retry max 5 times and give up. - const completeRetries = Promise.all([ - mockRouterCompleteResponse({ statusCode: 500 }), - mockRouterCompleteResponse({ statusCode: 500 }), - mockRouterCompleteResponse({ statusCode: 500 }), - mockRouterCompleteResponse({ statusCode: 500 }), - mockRouterCompleteResponse({ statusCode: 500 }), - ]); + // The server will send a `complete` request when it shuts down. Here we + // test that the server will retry max 5 times and give up. + const completeRetries = Promise.all([ + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + mockRouterCompleteResponse({ statusCode: 500 }), + ]); - await server.stop(); - await completeRetries; + await server.stop(); + await completeRetries; - // The heartbeat should be cleaned up at this point. There is no second - // heartbeat mock, so if it ticks again it'll throw an error. - jest.advanceTimersByTime(5000); + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", @@ -1824,86 +1840,87 @@ describe('SubscriptionCallbackPlugin', () => { "ERROR: SubscriptionManager[1234-cats]: \`complete\` request failed: \`complete\` request failed with unexpected status code: 500", "SubscriptionManager: Terminating subscriptions for ID: 1234-cats", "SubscriptionManager: Terminating heartbeat interval for http://mock-router-url.com", + "SubscriptionManager[1234-cats]: Subscription completed without errors", "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); - }); + }); - it('terminates subscription after max retries `next` requests', async () => { - const server = await startSubscriptionServer({ logger }); + it('terminates subscription after max retries `next` requests', async () => { + const server = await startSubscriptionServer({ logger }); - // Mock the initial check response from the router. - mockRouterCheckResponse(); + // Mock the initial check response from the router. + mockRouterCheckResponse(); - // Start the subscription; this triggers the initial check request and - // starts the heartbeat interval. This simulates an incoming subscription - // request from the router. - const result = await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + // Start the subscription; this triggers the initial check request and + // starts the heartbeat interval. This simulates an incoming subscription + // request from the router. + const result = await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql subscription { count } `, - extensions: { - subscription: { - callbackUrl: 'http://mock-router-url.com', - subscriptionId: '1234-cats', - verifier: 'my-verifier-token', + extensions: { + subscription: { + callbackUrl: 'http://mock-router-url.com', + subscriptionId: '1234-cats', + verifier: 'my-verifier-token', + }, }, }, - }, - }), - ); - - expect(result.status).toEqual(200); - - // Mock the heartbeat response from the router. We'll trigger it once below - // after the subscription is initialized to make sure it works. - const firstHeartbeat = mockRouterCheckResponse(); - // Advance timers to trigger the heartbeat once. This consumes the one - // heartbeat mock from above. - jest.advanceTimersByTime(5000); - await firstHeartbeat; - - // 5 failures to hit the retry limit - const updates = Promise.all( - [...new Array(5)].map(() => - mockRouterNextResponse({ - payload: { count: 1 }, - statusCode: 500, }), - ), - ); - - // Trigger a couple updates. These send `next` requests to the router. - logger.debug('TESTING: Triggering first update'); - await server.executeHTTPGraphQLRequest( - buildHTTPGraphQLRequest({ - body: { - query: `#graphql + ); + + expect(result.status).toEqual(200); + + // Mock the heartbeat response from the router. We'll trigger it once below + // after the subscription is initialized to make sure it works. + const firstHeartbeat = mockRouterCheckResponse(); + // Advance timers to trigger the heartbeat once. This consumes the one + // heartbeat mock from above. + jest.advanceTimersByTime(5000); + await firstHeartbeat; + + // 5 failures to hit the retry limit + const updates = Promise.all( + [...new Array(5)].map(() => + mockRouterNextResponse({ + payload: { count: 1 }, + statusCode: 500, + }), + ), + ); + + // Trigger a couple updates. These send `next` requests to the router. + logger.debug('TESTING: Triggering first update'); + await server.executeHTTPGraphQLRequest( + buildHTTPGraphQLRequest({ + body: { + query: `#graphql mutation { addOne } `, - }, - }), - ); + }, + }), + ); - // After 5 failures, the plugin will terminate the subscriptions without - // sending a `complete` request. - await updates; - // Jest needs a little help here to finish handling the retry failures - // and cancel the subscription. - await new Promise((resolve) => setTimeout(resolve, 100)); - await server.stop(); + // After 5 failures, the plugin will terminate the subscriptions without + // sending a `complete` request. + await updates; + // Jest needs a little help here to finish handling the retry failures + // and cancel the subscription. + await new Promise((resolve) => setTimeout(resolve, 100)); + await server.stop(); - // The heartbeat should be cleaned up at this point. There is no second - // heartbeat mock, so if it ticks again it'll throw an error. - jest.advanceTimersByTime(5000); + // The heartbeat should be cleaned up at this point. There is no second + // heartbeat mock, so if it ticks again it'll throw an error. + jest.advanceTimersByTime(5000); - expect(logger.orderOfOperations).toMatchInlineSnapshot(` + expect(logger.orderOfOperations).toMatchInlineSnapshot(` [ "SubscriptionCallback[1234-cats]: Received new subscription request", "SubscriptionManager[1234-cats]: Sending \`check\` request to router", @@ -1930,9 +1947,10 @@ describe('SubscriptionCallbackPlugin', () => { "SubscriptionCallback: Successfully cleaned up outstanding subscriptions and heartbeat intervals.", ] `); + }); }); - }); - }); + }, + ); }); async function startSubscriptionServer( diff --git a/packages/server/src/plugin/subscriptionCallback/index.ts b/packages/server/src/plugin/subscriptionCallback/index.ts index 02c07f83a7a..33258d751e0 100644 --- a/packages/server/src/plugin/subscriptionCallback/index.ts +++ b/packages/server/src/plugin/subscriptionCallback/index.ts @@ -197,6 +197,7 @@ function isAsyncIterable(value: any): value is AsyncIterable { interface SubscriptionObject { cancelled: boolean; + asyncIter: AsyncGenerator; startConsumingSubscription: () => Promise; completeSubscription: () => Promise; } @@ -523,6 +524,7 @@ class SubscriptionManager { const { subscription, heartbeat } = subscriptionInfo; if (subscription) { subscription.cancelled = true; + subscription.asyncIter?.return(); } // cleanup heartbeat for subscription if (heartbeat) { @@ -551,6 +553,7 @@ class SubscriptionManager { // allows us to break out of the `for await` and ignore future payloads. const self = this; const subscriptionObject = { + asyncIter: subscription, cancelled: false, async startConsumingSubscription() { self.logger?.debug(`Listening to graphql-js subscription`, id);