Skip to content

Commit

Permalink
Implement support for @stream directive
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/execution/execute.ts
#	src/validation/index.d.ts
#	src/validation/index.ts
  • Loading branch information
robrichard committed Jun 3, 2021
1 parent 0ceab21 commit ec7a4c4
Show file tree
Hide file tree
Showing 8 changed files with 1,219 additions and 17 deletions.
700 changes: 700 additions & 0 deletions src/execution/__tests__/stream-test.ts

Large diffs are not rendered by default.

255 changes: 238 additions & 17 deletions src/execution/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
GraphQLIncludeDirective,
GraphQLSkipDirective,
GraphQLDeferDirective,
GraphQLStreamDirective,
} from '../type/directives';
import {
isObjectType,
Expand Down Expand Up @@ -147,7 +148,7 @@ export interface FormattedExecutionResult<
* - `extensions` is reserved for adding non-standard properties.
*/
export interface ExecutionPatchResult<
TData = ObjMap<unknown>,
TData = ObjMap<unknown> | unknown,
TExtensions = ObjMap<unknown>,
> {
errors?: ReadonlyArray<GraphQLError>;
Expand All @@ -159,7 +160,7 @@ export interface ExecutionPatchResult<
}

export interface FormattedExecutionPatchResult<
TData = ObjMap<unknown>,
TData = ObjMap<unknown> | unknown,
TExtensions = ObjMap<unknown>,
> {
errors?: ReadonlyArray<GraphQLFormattedError>;
Expand Down Expand Up @@ -716,6 +717,44 @@ function getDeferValues(
};
}

/**
* Returns an object containing the @stream arguments if a field should be
* streamed based on the experimental flag, stream directive present and
* not disabled by the "if" argument.
*/
function getStreamValues(
exeContext: ExecutionContext,
fieldNodes: ReadonlyArray<FieldNode>,
):
| undefined
| {
initialCount?: number;
label?: string;
} {
// validation only allows equivalent streams on multiple fields, so it is
// safe to only check the first fieldNode for the stream directive
const stream = getDirectiveValues(
GraphQLStreamDirective,
fieldNodes[0],
exeContext.variableValues,
);

if (!stream) {
return;
}

if (stream.if === false) {
return;
}

return {
initialCount:
// istanbul ignore next (initialCount is required number argument)
typeof stream.initialCount === 'number' ? stream.initialCount : undefined,
label: typeof stream.label === 'string' ? stream.label : undefined,
};
}

/**
* Determines if a fragment is applicable to the given type.
*/
Expand Down Expand Up @@ -1004,8 +1043,28 @@ function completeAsyncIteratorValue(
errors: Array<GraphQLError>,
): Promise<ReadonlyArray<unknown>> {
let containsPromise = false;
const stream = getStreamValues(exeContext, fieldNodes);
return new Promise<ReadonlyArray<unknown>>((resolve) => {
function next(index: number, completedResults: Array<unknown>) {
if (
stream &&
typeof stream.initialCount === 'number' &&
index >= stream.initialCount
) {
exeContext.dispatcher.addAsyncIteratorValue(
index,
iterator,
exeContext,
fieldNodes,
info,
itemType,
path,
stream.label,
);
resolve(completedResults);
return;
}

const fieldPath = addPath(path, index, undefined);
iterator.next().then(
({ value, done }) => {
Expand Down Expand Up @@ -1094,15 +1153,37 @@ function completeListValue(
);
}

const stream = getStreamValues(exeContext, fieldNodes);

// This is specified as a simple map, however we're optimizing the path
// where the list contains no Promises by avoiding creating another Promise.
let containsPromise = false;
const completedResults = Array.from(result, (item, index) => {
const completedResults = [];
let index = 0;
for (const item of result) {
// No need to modify the info object containing the path,
// since from here on it is not ever accessed by resolver functions.
const itemPath = addPath(path, index, undefined);
try {
let completedItem;

if (
stream &&
typeof stream.initialCount === 'number' &&
index >= stream.initialCount
) {
exeContext.dispatcher.addValue(
itemPath,
item,
exeContext,
fieldNodes,
info,
itemType,
stream.label,
);
index++;
continue;
}
if (isPromise(item)) {
completedItem = item.then((resolved) =>
completeValue(
Expand Down Expand Up @@ -1131,21 +1212,25 @@ function completeListValue(
containsPromise = true;
// Note: we don't rely on a `catch` method, but we do expect "thenable"
// to take a second callback for the error case.
return completedItem.then(undefined, (rawError) => {
const error = locatedError(
rawError,
fieldNodes,
pathToArray(itemPath),
);
return handleFieldError(error, itemType, errors);
});
completedResults.push(
completedItem.then(undefined, (rawError) => {
const error = locatedError(
rawError,
fieldNodes,
pathToArray(itemPath),
);
return handleFieldError(error, itemType, errors);
}),
);
} else {
completedResults.push(completedItem);
}
return completedItem;
} catch (rawError) {
const error = locatedError(rawError, fieldNodes, pathToArray(itemPath));
return handleFieldError(error, itemType, errors);
completedResults.push(handleFieldError(error, itemType, errors));
}
});
index++;
}

return containsPromise ? Promise.all(completedResults) : completedResults;
}
Expand Down Expand Up @@ -1521,7 +1606,7 @@ export function getFieldDef(
*/
interface DispatcherResult {
errors?: ReadonlyArray<GraphQLError>;
data?: ObjMap<unknown> | null;
data?: ObjMap<unknown> | unknown | null;
path: ReadonlyArray<string | number>;
label?: string;
extensions?: ObjMap<unknown>;
Expand Down Expand Up @@ -1560,6 +1645,129 @@ export class Dispatcher {
);
}

addValue(
path: Path,
promiseOrData: PromiseOrValue<unknown>,
exeContext: ExecutionContext,
fieldNodes: ReadonlyArray<FieldNode>,
info: GraphQLResolveInfo,
itemType: GraphQLOutputType,
label?: string,
): void {
const errors: Array<GraphQLError> = [];
this._subsequentPayloads.push(
Promise.resolve(promiseOrData)
.then((resolved) =>
completeValue(
exeContext,
itemType,
fieldNodes,
info,
path,
resolved,
errors,
),
)
// Note: we don't rely on a `catch` method, but we do expect "thenable"
// to take a second callback for the error case.
.then(undefined, (rawError) => {
const error = locatedError(rawError, fieldNodes, pathToArray(path));
return handleFieldError(error, itemType, errors);
})
.then((data) => ({
value: createPatchResult(data, label, path, errors),
done: false,
})),
);
}

addAsyncIteratorValue(
initialIndex: number,
iterator: AsyncIterator<unknown>,
exeContext: ExecutionContext,
fieldNodes: ReadonlyArray<FieldNode>,
info: GraphQLResolveInfo,
itemType: GraphQLOutputType,
path?: Path,
label?: string,
): void {
const subsequentPayloads = this._subsequentPayloads;
function next(index: number) {
const fieldPath = addPath(path, index, undefined);
const patchErrors: Array<GraphQLError> = [];
subsequentPayloads.push(
iterator.next().then(
({ value: data, done }) => {
if (done) {
return { value: undefined, done: true };
}

// eslint-disable-next-line node/callback-return
next(index + 1);

try {
const completedItem = completeValue(
exeContext,
itemType,
fieldNodes,
info,
fieldPath,
data,
patchErrors,
);

if (isPromise(completedItem)) {
return completedItem.then((resolveItem) => ({
value: createPatchResult(
resolveItem,
label,
fieldPath,
patchErrors,
),
done: false,
}));
}

return {
value: createPatchResult(
completedItem,
label,
fieldPath,
patchErrors,
),
done: false,
};
} catch (rawError) {
const error = locatedError(
rawError,
fieldNodes,
pathToArray(fieldPath),
);
handleFieldError(error, itemType, patchErrors);
return {
value: createPatchResult(null, label, fieldPath, patchErrors),
done: false,
};
}
},
(rawError) => {
const error = locatedError(
rawError,
fieldNodes,
pathToArray(fieldPath),
);
handleFieldError(error, itemType, patchErrors);
return {
value: createPatchResult(null, label, fieldPath, patchErrors),
done: false,
};
},
),
);
}
next(initialIndex);
}

_race(): Promise<IteratorResult<ExecutionPatchResult, void>> {
return new Promise<{
promise: Promise<IteratorResult<DispatcherResult, void>>;
Expand All @@ -1579,7 +1787,20 @@ export class Dispatcher {
);
return promise;
})
.then(({ value }) => {
.then(({ value, done }) => {
if (done && this._subsequentPayloads.length === 0) {
// async iterable resolver just finished and no more pending payloads
return {
value: {
hasNext: false,
},
done: false,
};
} else if (done) {
// async iterable resolver just finished but there are pending payloads
// return the next one
return this._race();
}
const returnValue: ExecutionPatchResult = {
...value,
hasNext: this._subsequentPayloads.length > 0,
Expand Down Expand Up @@ -1621,7 +1842,7 @@ export class Dispatcher {
}

function createPatchResult(
data: ObjMap<unknown> | null,
data: ObjMap<unknown> | unknown | null,
label?: string,
path?: Path,
errors?: ReadonlyArray<GraphQLError>,
Expand Down
Loading

0 comments on commit ec7a4c4

Please sign in to comment.