Skip to content

Commit

Permalink
do not initiate multiple streams at the same path
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jan 17, 2023
1 parent cb6fac3 commit b63c09a
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 21 deletions.
101 changes: 101 additions & 0 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,107 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Does not initiate multiple streams at the same path', async () => {
const document = parse(`
query {
friendList @stream(initialCount: 2) {
name
id
}
... @defer {
friendList @stream(initialCount: 2) {
name
id
}
}
}
`);
const result = await complete(document, { friendList: friends });
expectJSON(result).toDeepEqual([
{
data: {
friendList: [
{ name: 'Luke', id: '1' },
{ name: 'Han', id: '2' },
],
},
hasNext: true,
},
{
incremental: [
{
items: [{ name: 'Leia', id: '3' }],
path: ['friendList', 2],
},
{
data: {
friendList: [
{ name: 'Luke', id: '1' },
{ name: 'Han', id: '2' },
],
},
path: [],
},
],
hasNext: false,
},
]);
});
it('Does not initiate multiple streams at the same path for async iterables', async () => {
const document = parse(`
query {
friendList @stream(initialCount: 2) {
name
id
}
... @defer {
friendList @stream(initialCount: 2) {
name
id
}
}
}
`);
const result = await complete(document, {
async *friendList() {
yield await Promise.resolve(friends[0]);
yield await Promise.resolve(friends[1]);
yield await Promise.resolve(friends[2]);
},
});
expectJSON(result).toDeepEqual([
{
data: {
friendList: [
{ name: 'Luke', id: '1' },
{ name: 'Han', id: '2' },
],
},
hasNext: true,
},
{
incremental: [
{
data: {
friendList: [
{ name: 'Luke', id: '1' },
{ name: 'Han', id: '2' },
],
},
path: [],
},
{
items: [{ name: 'Leia', id: '3' }],
path: ['friendList', 2],
},
],
hasNext: true,
},
{
hasNext: false,
},
]);
});
it('Negative values of initialCount throw field errors on a field that returns an async iterable', async () => {
const document = parse(`
query {
Expand Down
61 changes: 40 additions & 21 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ export interface ExecutionContext {
errors: Array<GraphQLError>;
subsequentPayloads: Set<AsyncPayloadRecord>;
branches: WeakMap<GroupedFieldSet, Set<Path | undefined>>;
streams: WeakSet<FieldGroup>;
}

/**
Expand Down Expand Up @@ -505,6 +506,7 @@ export function buildExecutionContext(
addPath: createPathFactory(),
subsequentPayloads: new Set(),
branches: new WeakMap(),
streams: new WeakSet(),
errors: [],
};
}
Expand All @@ -519,6 +521,7 @@ function buildPerEventExecutionContext(
addPath: createPathFactory(),
subsequentPayloads: new Set(),
branches: new WeakMap(),
streams: new WeakSet(),
errors: [],
};
}
Expand All @@ -540,6 +543,18 @@ function shouldBranch(
return false;
}

function shouldStream(
fieldGroup: FieldGroup,
exeContext: ExecutionContext,
): boolean {
const hasStreamed = exeContext.streams.has(fieldGroup);
if (hasStreamed) {
return false;
}
exeContext.streams.add(fieldGroup);
return true;
}

/**
* Implements the "Executing operations" section of the spec.
*/
Expand Down Expand Up @@ -1102,17 +1117,19 @@ async function completeAsyncIteratorValue(
typeof stream.initialCount === 'number' &&
index >= stream.initialCount
) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
executeStreamAsyncIterator(
index,
asyncIterator,
exeContext,
fieldGroup,
info,
itemType,
path,
asyncPayloadRecord,
);
if (shouldStream(fieldGroup, exeContext)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
executeStreamAsyncIterator(
index,
asyncIterator,
exeContext,
fieldGroup,
info,
itemType,
path,
asyncPayloadRecord,
);
}
break;
}

Expand Down Expand Up @@ -1208,16 +1225,18 @@ function completeListValue(
typeof stream.initialCount === 'number' &&
index >= stream.initialCount
) {
executeStreamIterator(
index,
iterator,
exeContext,
fieldGroup,
info,
itemType,
path,
asyncPayloadRecord,
);
if (shouldStream(fieldGroup, exeContext)) {
executeStreamIterator(
index,
iterator,
exeContext,
fieldGroup,
info,
itemType,
path,
asyncPayloadRecord,
);
}
break;
}

Expand Down

0 comments on commit b63c09a

Please sign in to comment.