Skip to content

Commit

Permalink
introduce executeStreamIterator
Browse files Browse the repository at this point in the history
avoids needlessly rechecking whether a stream has begun
  • Loading branch information
yaacovCR committed Jan 17, 2023
1 parent c7d216e commit cb6fac3
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 82 deletions.
42 changes: 42 additions & 0 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,48 @@ describe('Execute: stream directive', () => {
},
]);
});
it('Can stream a field from a deferred path', async () => {
const document = parse(`
query {
... @defer {
friendList @stream(initialCount: 2) {
name
id
}
}
}
`);
const result = await complete(document, { friendList: friends });
expectJSON(result).toDeepEqual([
{
data: {},
hasNext: true,
},
{
incremental: [
{
data: {
friendList: [
{ name: 'Luke', id: '1' },
{ name: 'Han', id: '2' },
],
},
path: [],
},
],
hasNext: true,
},
{
incremental: [
{
items: [{ name: 'Leia', id: '3' }],
path: ['friendList', 2],
},
],
hasNext: false,
},
]);
});
it('Negative values of initialCount throw field errors on a field that returns an async iterable', async () => {
const document = parse(`
query {
Expand Down
188 changes: 106 additions & 82 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1195,33 +1195,39 @@ function completeListValue(
// This is specified as a simple map, however we're optimizing the path
// where the list contains no Promises by avoiding creating another Promise.
let containsPromise = false;
let previousAsyncPayloadRecord = asyncPayloadRecord;
const completedResults: Array<unknown> = [];
const iterator = result[Symbol.iterator]();
let index = 0;
for (const item of result) {
// eslint-disable-next-line no-constant-condition
while (true) {
// No need to modify the info object containing the path,
// since from here on it is not ever accessed by resolver functions.
const itemPath = exeContext.addPath(path, index, undefined);

if (
stream &&
typeof stream.initialCount === 'number' &&
index >= stream.initialCount
) {
previousAsyncPayloadRecord = executeStreamField(
path,
itemPath,
item,
executeStreamIterator(
index,
iterator,
exeContext,
fieldGroup,
info,
itemType,
previousAsyncPayloadRecord,
path,
asyncPayloadRecord,
);
index++;
continue;
break;
}

const { done, value: item } = iterator.next();
if (done) {
break;
}

const itemPath = exeContext.addPath(path, index, undefined);

if (
completeListItemValue(
item,
Expand Down Expand Up @@ -1886,107 +1892,125 @@ function executeDeferredFragment(
asyncPayloadRecord.addData(promiseOrData);
}

function executeStreamField(
path: Path,
itemPath: Path,
item: PromiseOrValue<unknown>,
function executeStreamIterator(
initialIndex: number,
iterator: Iterator<unknown>,
exeContext: ExecutionContext,
fieldGroup: FieldGroup,
info: GraphQLResolveInfo,
itemType: GraphQLOutputType,
path: Path,
parentContext?: AsyncPayloadRecord,
): AsyncPayloadRecord {
const asyncPayloadRecord = new StreamRecord({
deferDepth: parentContext?.deferDepth,
path: itemPath,
parentContext,
exeContext,
});
if (isPromise(item)) {
const completedItems = completePromisedValue(
exeContext,
itemType,
fieldGroup,
info,
itemPath,
item,
asyncPayloadRecord,
).then(
(value) => [value],
(error) => {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return null;
},
);
): void {
let index = initialIndex;
const deferDepth = parentContext?.deferDepth;
let previousAsyncPayloadRecord = parentContext ?? undefined;
// eslint-disable-next-line no-constant-condition
while (true) {
const { done, value: item } = iterator.next();
if (done) {
break;
}

asyncPayloadRecord.addItems(completedItems);
return asyncPayloadRecord;
}
const itemPath = exeContext.addPath(path, index, undefined);
const asyncPayloadRecord = new StreamRecord({
deferDepth,
path: itemPath,
parentContext: previousAsyncPayloadRecord,
exeContext,
});

let completedItem: PromiseOrValue<unknown>;
try {
try {
completedItem = completeValue(
if (isPromise(item)) {
const completedItems = completePromisedValue(
exeContext,
itemType,
fieldGroup,
info,
itemPath,
item,
asyncPayloadRecord,
).then(
(value) => [value],
(error) => {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return null;
},
);
} catch (rawError) {
const error = locatedError(
rawError,
toNodes(fieldGroup),
pathToArray(itemPath),
);
completedItem = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);

asyncPayloadRecord.addItems(completedItems);
previousAsyncPayloadRecord = asyncPayloadRecord;
index++;
continue;
}
} catch (error) {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
asyncPayloadRecord.addItems(null);
return asyncPayloadRecord;
}

if (isPromise(completedItem)) {
const completedItems = completedItem
.then(undefined, (rawError) => {
let completedItem: PromiseOrValue<unknown>;
try {
try {
completedItem = completeValue(
exeContext,
itemType,
fieldGroup,
info,
itemPath,
item,
asyncPayloadRecord,
);
} catch (rawError) {
const error = locatedError(
rawError,
toNodes(fieldGroup),
pathToArray(itemPath),
);
const handledError = handleFieldError(
completedItem = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
return handledError;
})
.then(
(value) => [value],
(error) => {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return null;
},
);
}
} catch (error) {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
asyncPayloadRecord.addItems(null);
previousAsyncPayloadRecord = asyncPayloadRecord;
index++;
continue;
}

asyncPayloadRecord.addItems(completedItems);
return asyncPayloadRecord;
}
if (isPromise(completedItem)) {
const completedItems = completedItem
.then(undefined, (rawError) => {
const error = locatedError(
rawError,
toNodes(fieldGroup),
pathToArray(itemPath),
);
const handledError = handleFieldError(
error,
itemType,
asyncPayloadRecord.errors,
);
filterSubsequentPayloads(exeContext, itemPath, asyncPayloadRecord);
return handledError;
})
.then(
(value) => [value],
(error) => {
asyncPayloadRecord.errors.push(error);
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
return null;
},
);

asyncPayloadRecord.addItems(completedItems);
} else {
asyncPayloadRecord.addItems([completedItem]);
}

asyncPayloadRecord.addItems([completedItem]);
return asyncPayloadRecord;
previousAsyncPayloadRecord = asyncPayloadRecord;
index++;
}
}

async function executeStreamAsyncIteratorItem(
Expand Down

0 comments on commit cb6fac3

Please sign in to comment.