From b63c09a70a552c2739e70747ea39c1e4747b61b8 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 17 Jan 2023 11:46:32 +0200 Subject: [PATCH] do not initiate multiple streams at the same path --- src/execution/__tests__/stream-test.ts | 101 +++++++++++++++++++++++++ src/execution/execute.ts | 61 ++++++++++----- 2 files changed, 141 insertions(+), 21 deletions(-) diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index f9e44fa5c07..0958732d032 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -678,6 +678,107 @@ describe('Execute: stream directive', () => { }, ]); }); + it('Does not initiate multiple streams at the same path', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 2) { + name + id + } + ... @defer { + friendList @stream(initialCount: 2) { + name + id + } + } + } + `); + const result = await complete(document, { friendList: friends }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [ + { name: 'Luke', id: '1' }, + { name: 'Han', id: '2' }, + ], + }, + hasNext: true, + }, + { + incremental: [ + { + items: [{ name: 'Leia', id: '3' }], + path: ['friendList', 2], + }, + { + data: { + friendList: [ + { name: 'Luke', id: '1' }, + { name: 'Han', id: '2' }, + ], + }, + path: [], + }, + ], + hasNext: false, + }, + ]); + }); + it('Does not initiate multiple streams at the same path for async iterables', async () => { + const document = parse(` + query { + friendList @stream(initialCount: 2) { + name + id + } + ... @defer { + friendList @stream(initialCount: 2) { + name + id + } + } + } + `); + const result = await complete(document, { + async *friendList() { + yield await Promise.resolve(friends[0]); + yield await Promise.resolve(friends[1]); + yield await Promise.resolve(friends[2]); + }, + }); + expectJSON(result).toDeepEqual([ + { + data: { + friendList: [ + { name: 'Luke', id: '1' }, + { name: 'Han', id: '2' }, + ], + }, + hasNext: true, + }, + { + incremental: [ + { + data: { + friendList: [ + { name: 'Luke', id: '1' }, + { name: 'Han', id: '2' }, + ], + }, + path: [], + }, + { + items: [{ name: 'Leia', id: '3' }], + path: ['friendList', 2], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); it('Negative values of initialCount throw field errors on a field that returns an async iterable', async () => { const document = parse(` query { diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 451db265265..6fda10238a8 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -125,6 +125,7 @@ export interface ExecutionContext { errors: Array; subsequentPayloads: Set; branches: WeakMap>; + streams: WeakSet; } /** @@ -505,6 +506,7 @@ export function buildExecutionContext( addPath: createPathFactory(), subsequentPayloads: new Set(), branches: new WeakMap(), + streams: new WeakSet(), errors: [], }; } @@ -519,6 +521,7 @@ function buildPerEventExecutionContext( addPath: createPathFactory(), subsequentPayloads: new Set(), branches: new WeakMap(), + streams: new WeakSet(), errors: [], }; } @@ -540,6 +543,18 @@ function shouldBranch( return false; } +function shouldStream( + fieldGroup: FieldGroup, + exeContext: ExecutionContext, +): boolean { + const hasStreamed = exeContext.streams.has(fieldGroup); + if (hasStreamed) { + return false; + } + exeContext.streams.add(fieldGroup); + return true; +} + /** * Implements the "Executing operations" section of the spec. */ @@ -1102,17 +1117,19 @@ async function completeAsyncIteratorValue( typeof stream.initialCount === 'number' && index >= stream.initialCount ) { - // eslint-disable-next-line @typescript-eslint/no-floating-promises - executeStreamAsyncIterator( - index, - asyncIterator, - exeContext, - fieldGroup, - info, - itemType, - path, - asyncPayloadRecord, - ); + if (shouldStream(fieldGroup, exeContext)) { + // eslint-disable-next-line @typescript-eslint/no-floating-promises + executeStreamAsyncIterator( + index, + asyncIterator, + exeContext, + fieldGroup, + info, + itemType, + path, + asyncPayloadRecord, + ); + } break; } @@ -1208,16 +1225,18 @@ function completeListValue( typeof stream.initialCount === 'number' && index >= stream.initialCount ) { - executeStreamIterator( - index, - iterator, - exeContext, - fieldGroup, - info, - itemType, - path, - asyncPayloadRecord, - ); + if (shouldStream(fieldGroup, exeContext)) { + executeStreamIterator( + index, + iterator, + exeContext, + fieldGroup, + info, + itemType, + path, + asyncPayloadRecord, + ); + } break; }