diff --git a/src/execution/executor.ts b/src/execution/executor.ts index faf8e58b2d..230dbd190c 100644 --- a/src/execution/executor.ts +++ b/src/execution/executor.ts @@ -138,29 +138,6 @@ interface BundlerInterface { setTotal: (total: number) => void; } -interface StreamResponseContext extends SubsequentResponseContext { - atIndex: number; -} - -interface BatchedParallelStreamResponseContext - extends SubsequentResponseContext { - atIndices: Array; -} - -interface StreamDataBundleContext extends StreamResponseContext { - result: unknown; -} - -interface BatchedSequentialStreamDataBundleContext - extends StreamResponseContext { - results: Array; -} - -interface BatchedParallelStreamDataBundleContext - extends BatchedParallelStreamResponseContext { - results: Array; -} - interface StreamDataResult { responseNode: ResponseNode; data: unknown; @@ -1519,6 +1496,81 @@ export class Executor { context.responseNodes.push(responseNode); } + createBundler< + TDataContext extends SubsequentResponseContext, + TErrorContext extends SubsequentResponseContext, + >( + exeContext: ExecutionContext, + parentResponseNode: ResponseNode, + initialCount: number, + maxChunkSize: number, + maxInterval: Maybe, + resultToNewDataContext: ( + index: number, + result: StreamDataResult, + ) => TDataContext, + indexToNewErrorContext: (index: number) => TErrorContext, + onSubsequentData: ( + index: number, + result: StreamDataResult, + context: TDataContext, + ) => void, + onSubsequentError: (index: number, context: TErrorContext) => void, + dataContextToIncrementalResult: ( + context: TDataContext, + ) => IncrementalResult, + errorContextToIncrementalResult: ( + context: TErrorContext, + ) => IncrementalResult, + ): Bundler { + return new Bundler< + StreamDataResult, + ResponseNode, + TDataContext, + TErrorContext + >({ + initialIndex: initialCount, + maxBundleSize: maxChunkSize, + maxInterval, + createDataBundleContext: (index, result) => + this.onNewBundleContext( + exeContext.state, + resultToNewDataContext(index, result), + result.responseNode, + ), + createErrorBundleContext: (index, responseNode) => + this.onNewBundleContext( + exeContext.state, + indexToNewErrorContext(index), + responseNode, + ), + onSubsequentData: (index, result, context) => { + this.onSubsequentResponseNode( + exeContext.state, + context, + result.responseNode, + ); + onSubsequentData(index, result, context); + }, + onSubsequentError: (index, responseNode, context) => { + this.onSubsequentResponseNode(exeContext.state, context, responseNode); + onSubsequentError(index, context); + }, + onDataBundle: (context) => + exeContext.publisher.queue( + context.responseNodes, + dataContextToIncrementalResult(context), + parentResponseNode, + ), + onErrorBundle: (context) => + exeContext.publisher.queue( + context.responseNodes, + errorContextToIncrementalResult(context), + parentResponseNode, + ), + }); + } + createStreamContext( exeContext: ExecutionContext, initialCount: number, @@ -1530,65 +1582,42 @@ export class Executor { parentResponseNode: ResponseNode, ): StreamContext { if (maxChunkSize === 1) { - const bundler = new Bundler< - StreamDataResult, - ResponseNode, - StreamDataBundleContext, - StreamResponseContext - >({ - initialIndex: initialCount, - maxBundleSize: maxChunkSize, + const bundler = this.createBundler( + exeContext, + parentResponseNode, + initialCount, + maxChunkSize, maxInterval, - createDataBundleContext: (index, result) => - this.onNewBundleContext( - exeContext.state, - { - responseNodes: [], - parentResponseNode, - result: result.data, - atIndex: index, - }, - result.responseNode, - ), - createErrorBundleContext: (index, responseNode) => - this.onNewBundleContext( - exeContext.state, - { - responseNodes: [], - parentResponseNode, - atIndex: index, - }, - responseNode, - ) /* c8 ignore start */, - onSubsequentData: () => { + (index, result) => ({ + responseNodes: [], + parentResponseNode, + result: result.data, + atIndex: index, + }), + (index) => ({ + responseNodes: [], + parentResponseNode, + atIndex: index, + }) /* c8 ignore start */, + () => { /* with maxBundleSize of 1, this function will never be called */ }, - onSubsequentError: () => { + () => { /* with maxBundleSize of 1, this function will never be called */ } /* c8 ignore stop */, - onDataBundle: (context) => - exeContext.publisher.queue( - context.responseNodes, - { - responseContext: context, - data: context.result, - path: addPath(path, context.atIndex, undefined), - label, - }, - parentResponseNode, - ), - onErrorBundle: (context) => - exeContext.publisher.queue( - context.responseNodes, - { - responseContext: context, - data: null, - path: addPath(path, context.atIndex, undefined), - label, - }, - parentResponseNode, - ), - }); + (context) => ({ + responseContext: context, + data: context.result, + path: addPath(path, context.atIndex, undefined), + label, + }), + (context) => ({ + responseContext: context, + data: null, + path: addPath(path, context.atIndex, undefined), + label, + }), + ); return { initialCount, @@ -1603,78 +1632,45 @@ export class Executor { return { initialCount, path, - bundler: new Bundler< - StreamDataResult, - ResponseNode, - BatchedParallelStreamDataBundleContext, - BatchedParallelStreamResponseContext - >({ - initialIndex: initialCount, - maxBundleSize: maxChunkSize, + bundler: this.createBundler( + exeContext, + parentResponseNode, + initialCount, + maxChunkSize, maxInterval, - createDataBundleContext: (index, result) => - this.onNewBundleContext( - exeContext.state, - { - responseNodes: [], - parentResponseNode, - atIndices: [index], - results: [result.data], - }, - result.responseNode, - ), - createErrorBundleContext: (index, responseNode) => - this.onNewBundleContext( - exeContext.state, - { - responseNodes: [], - parentResponseNode, - atIndices: [index], - }, - responseNode, - ), - onSubsequentData: (index, result, context) => { - this.onSubsequentResponseNode( - exeContext.state, - context, - result.responseNode, - ); + (index, result) => ({ + responseNodes: [], + parentResponseNode, + atIndices: [index], + results: [result.data], + }), + (index) => ({ + responseNodes: [], + parentResponseNode, + atIndices: [index], + }), + (index, result, context) => { context.results.push(result.data); context.atIndices.push(index); }, - onSubsequentError: (index, responseNode, context) => { - this.onSubsequentResponseNode( - exeContext.state, - context, - responseNode, - ); + (index, context) => { context.atIndices.push(index); }, - onDataBundle: (context) => - exeContext.publisher.queue( - context.responseNodes, - { - responseContext: context, - data: context.results, - path, - atIndices: context.atIndices, - label, - }, - parentResponseNode, - ), - onErrorBundle: (context) => - exeContext.publisher.queue( - context.responseNodes, - { - responseContext: context, - data: null, - path, - atIndices: context.atIndices, - label, - }, - parentResponseNode, - ), - }), + (context) => ({ + responseContext: context, + data: context.results, + path, + atIndices: context.atIndices, + label, + }), + (context) => ({ + responseContext: context, + data: null, + path, + atIndices: context.atIndices, + label, + }), + ), }; } @@ -1683,75 +1679,44 @@ export class Executor { path, bundler: getSequentialBundler( initialCount, - new Bundler< - StreamDataResult, - ResponseNode, - BatchedSequentialStreamDataBundleContext, - StreamResponseContext - >({ - initialIndex: initialCount, - maxBundleSize: maxChunkSize, + this.createBundler( + exeContext, + parentResponseNode, + initialCount, + maxChunkSize, maxInterval, - createDataBundleContext: (index, result) => - this.onNewBundleContext( - exeContext.state, - { - responseNodes: [], - parentResponseNode, - atIndex: index, - results: [result.data], - }, - result.responseNode, - ), - createErrorBundleContext: (index, responseNode) => - this.onNewBundleContext( - exeContext.state, - { - responseNodes: [], - parentResponseNode, - atIndex: index, - }, - responseNode, - ), - onSubsequentData: (_index, result, context) => { - this.onSubsequentResponseNode( - exeContext.state, - context, - result.responseNode, - ); + (index, result) => ({ + responseNodes: [], + parentResponseNode, + atIndex: index, + results: [result.data], + }), + (index) => ({ + responseNodes: [], + parentResponseNode, + atIndex: index, + }), + (_index, result, context) => { context.results.push(result.data); - }, - onSubsequentError: (_index, responseNode, context) => - this.onSubsequentResponseNode( - exeContext.state, - context, - responseNode, - ), - onDataBundle: (context) => - exeContext.publisher.queue( - context.responseNodes, - { - responseContext: context, - data: context.results, - path, - atIndex: context.atIndex, - label, - }, - parentResponseNode, - ), - onErrorBundle: (context) => - exeContext.publisher.queue( - context.responseNodes, - { - responseContext: context, - data: null, - path, - atIndex: context.atIndex, - label, - }, - parentResponseNode, - ), - }), + } /* c8 ignore start */, + () => { + /* with serial bundlers and no data, no additional action is needed */ + } /* c8 ignore stop */, + (context) => ({ + responseContext: context, + data: context.results, + path, + atIndex: context.atIndex, + label, + }), + (context) => ({ + responseContext: context, + data: null, + path, + atIndex: context.atIndex, + label, + }), + ), ), }; }