From aea5bf82e5d9dbb37da98342fcfd150f8ac81aef Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Mon, 18 Dec 2017 18:20:19 -0800 Subject: [PATCH 1/2] grpc-js-core: handle eos headers as trailers --- packages/grpc-js-core/src/call-stream.ts | 136 +++++++++++------- packages/grpc-js-core/src/channel.ts | 5 +- .../src/metadata-status-filter.ts | 36 +++++ packages/grpc-js-core/src/metadata.ts | 2 +- 4 files changed, 125 insertions(+), 54 deletions(-) create mode 100644 packages/grpc-js-core/src/metadata-status-filter.ts diff --git a/packages/grpc-js-core/src/call-stream.ts b/packages/grpc-js-core/src/call-stream.ts index f62e42afa..0d90136d8 100644 --- a/packages/grpc-js-core/src/call-stream.ts +++ b/packages/grpc-js-core/src/call-stream.ts @@ -105,6 +105,13 @@ export class Http2CallStream extends Duplex implements CallStream { // Status code mapped from :status. To be used if grpc-status is not received private mappedStatusCode: Status = Status.UNKNOWN; + // Promise objects that are re-assigned to resolving promises when headers + // or trailers received. Processing headers/trailers is asynchronous, so we + // can use these objects to await their completion. This helps us establish + // order of precedence when obtaining the status of the call. + private handlingHeaders = Promise.resolve(); + private handlingTrailers = Promise.resolve(); + // This is populated (non-null) if and only if the call has ended private finalStatus: StatusObject|null = null; @@ -116,6 +123,11 @@ export class Http2CallStream extends Duplex implements CallStream { this.filterStack = filterStackFactory.createFilter(this); } + /** + * On first call, emits a 'status' event with the given StatusObject. + * Subsequent calls are no-ops. + * @param status The status of the call. + */ private endCall(status: StatusObject): void { if (this.finalStatus === null) { this.finalStatus = status; @@ -135,12 +147,46 @@ export class Http2CallStream extends Duplex implements CallStream { return canPush; } + private handleTrailers(headers: http2.IncomingHttpHeaders) { + let code: Status = this.mappedStatusCode; + let details = ''; + let metadata: Metadata; + try { + metadata = Metadata.fromHttp2Headers(headers); + } catch (e) { + metadata = new Metadata(); + } + let status: StatusObject = {code, details, metadata}; + this.handlingTrailers = (async () => { + let finalStatus; + try { + // Attempt to assign final status. + finalStatus = await this.filterStack.receiveTrailers(Promise.resolve(status)); + } catch (error) { + await this.handlingHeaders; + // This is a no-op if the call was already ended when handling headers. + this.endCall({ + code: Status.INTERNAL, + details: 'Failed to process received status', + metadata: new Metadata() + }); + return; + } + // It's possible that headers were received but not fully handled yet. + // Give the headers handler an opportunity to end the call first, + // if an error occurred. + await this.handlingHeaders; + // This is a no-op if the call was already ended when handling headers. + this.endCall(finalStatus); + })(); + } + attachHttp2Stream(stream: http2.ClientHttp2Stream): void { if (this.finalStatus !== null) { stream.rstWithCancel(); } else { this.http2Stream = stream; - stream.on('response', (headers) => { + stream.on('response', (headers, flags) => { switch (headers[HTTP2_HEADER_STATUS]) { // TODO(murgatroid99): handle 100 and 101 case '400': @@ -166,57 +212,27 @@ export class Http2CallStream extends Duplex implements CallStream { } delete headers[HTTP2_HEADER_STATUS]; delete headers[HTTP2_HEADER_CONTENT_TYPE]; - let metadata: Metadata; - try { - metadata = Metadata.fromHttp2Headers(headers); - } catch (e) { - this.cancelWithStatus(Status.UNKNOWN, e.message); - return; - } - this.filterStack.receiveMetadata(Promise.resolve(metadata)) - .then( - (finalMetadata) => { - this.emit('metadata', finalMetadata); - }, - (error) => { - this.cancelWithStatus(Status.UNKNOWN, error.message); - }); - }); - stream.on('trailers', (headers: http2.IncomingHttpHeaders) => { - let code: Status = this.mappedStatusCode; - let details = ''; - if (typeof headers['grpc-status'] === 'string') { - let receivedCode = Number(headers['grpc-status']); - if (receivedCode in Status) { - code = receivedCode; - } else { - code = Status.UNKNOWN; + if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) { + this.handleTrailers(headers); + } else { + let metadata: Metadata; + try { + metadata = Metadata.fromHttp2Headers(headers); + } catch (error) { + this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()}); + return; } - delete headers['grpc-status']; - } - if (typeof headers['grpc-message'] === 'string') { - details = decodeURI(headers['grpc-message'] as string); + this.handlingHeaders = + this.filterStack.receiveMetadata(Promise.resolve(metadata)) + .then((finalMetadata) => { + this.emit('metadata', finalMetadata); + }).catch((error) => { + this.destroyHttp2Stream(); + this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()}); + }); } - let metadata: Metadata; - try { - metadata = Metadata.fromHttp2Headers(headers); - } catch (e) { - metadata = new Metadata(); - } - let status: StatusObject = {code, details, metadata}; - this.filterStack.receiveTrailers(Promise.resolve(status)) - .then( - (finalStatus) => { - this.endCall(finalStatus); - }, - (error) => { - this.endCall({ - code: Status.INTERNAL, - details: 'Failed to process received status', - metadata: new Metadata() - }); - }); }); + stream.on('trailers', this.handleTrailers.bind(this)); stream.on('data', (data) => { let readHead = 0; let canPush = true; @@ -278,7 +294,7 @@ export class Http2CallStream extends Duplex implements CallStream { this.unpushedReadMessages.push(null); } }); - stream.on('streamClosed', (errorCode) => { + stream.on('streamClosed', async (errorCode) => { let code: Status; let details = ''; switch (errorCode) { @@ -299,6 +315,13 @@ export class Http2CallStream extends Duplex implements CallStream { default: code = Status.INTERNAL; } + // This guarantees that if trailers were received, the value of the + // 'grpc-status' header takes precedence for emitted status data. + await this.handlingTrailers; + // This is a no-op if trailers were received at all. + // This is OK, because status codes emitted here correspond to more + // catastrophic issues that prevent us from receiving trailers in the + // first place. this.endCall({code: code, details: details, metadata: new Metadata()}); }); stream.on('error', (err: Error) => { @@ -323,8 +346,7 @@ export class Http2CallStream extends Duplex implements CallStream { } } - cancelWithStatus(status: Status, details: string): void { - this.endCall({code: status, details: details, metadata: new Metadata()}); + private destroyHttp2Stream() { // The http2 stream could already have been destroyed if cancelWithStatus // is called in response to an internal http2 error. if (this.http2Stream !== null && !this.http2Stream.destroyed) { @@ -334,6 +356,16 @@ export class Http2CallStream extends Duplex implements CallStream { } } + cancelWithStatus(status: Status, details: string): void { + this.destroyHttp2Stream(); + (async () => { + // If trailers are currently being processed, the call should be ended + // by handleTrailers instead. + await this.handlingTrailers; + this.endCall({code: status, details: details, metadata: new Metadata()}); + })(); + } + getDeadline(): Deadline { return this.options.deadline; } diff --git a/packages/grpc-js-core/src/channel.ts b/packages/grpc-js-core/src/channel.ts index 5cdbf4075..773185df0 100644 --- a/packages/grpc-js-core/src/channel.ts +++ b/packages/grpc-js-core/src/channel.ts @@ -12,6 +12,7 @@ import {Status} from './constants'; import {DeadlineFilterFactory} from './deadline-filter'; import {FilterStackFactory} from './filter-stack'; import {Metadata, MetadataObject} from './metadata'; +import { MetadataStatusFilterFactory } from './metadata-status-filter'; const IDLE_TIMEOUT_MS = 300000; @@ -177,7 +178,9 @@ export class Http2Channel extends EventEmitter implements Channel { } this.filterStackFactory = new FilterStackFactory([ new CompressionFilterFactory(this), - new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this) + new CallCredentialsFilterFactory(this), + new DeadlineFilterFactory(this), + new MetadataStatusFilterFactory(this) ]); this.currentBackoffDeadline = new Date(); /* The only purpose of these lines is to ensure that this.backoffTimerId has diff --git a/packages/grpc-js-core/src/metadata-status-filter.ts b/packages/grpc-js-core/src/metadata-status-filter.ts new file mode 100644 index 000000000..43d42ea64 --- /dev/null +++ b/packages/grpc-js-core/src/metadata-status-filter.ts @@ -0,0 +1,36 @@ +import {CallStream} from './call-stream'; +import {Channel} from './channel'; +import {BaseFilter, Filter, FilterFactory} from './filter'; +import {StatusObject} from './call-stream'; +import {Status} from './constants'; + +export class MetadataStatusFilter extends BaseFilter implements Filter { + async receiveTrailers(status: Promise): Promise { + let { code, details, metadata } = await status; + if (code !== Status.UNKNOWN) { + // we already have a known status, so don't assign a new one. + return { code, details, metadata }; + } + const metadataMap = metadata.getMap(); + if (typeof metadataMap['grpc-status'] === 'string') { + let receivedCode = Number(metadataMap['grpc-status']); + if (receivedCode in Status) { + code = receivedCode; + } + metadata.remove('grpc-status'); + } + if (typeof metadataMap['grpc-message'] === 'string') { + details = decodeURI(metadataMap['grpc-message'] as string); + metadata.remove('grpc-message'); + } + return { code, details, metadata }; + } +} + +export class MetadataStatusFilterFactory implements + FilterFactory { + constructor(private readonly channel: Channel) {} + createFilter(callStream: CallStream): MetadataStatusFilter { + return new MetadataStatusFilter(); + } +} diff --git a/packages/grpc-js-core/src/metadata.ts b/packages/grpc-js-core/src/metadata.ts index c2ad6332a..dbdb4e914 100644 --- a/packages/grpc-js-core/src/metadata.ts +++ b/packages/grpc-js-core/src/metadata.ts @@ -26,7 +26,7 @@ function isLegalKey(key: string): boolean { } function isLegalNonBinaryValue(value: string): boolean { - return !!value.match(/^[ -~]+$/); + return !!value.match(/^[ -~]*$/); } function isBinaryKey(key: string): boolean { From 22438ae67819d44cf8d14d8960fa025e80c0f415 Mon Sep 17 00:00:00 2001 From: Kelvin Jin Date: Wed, 20 Dec 2017 14:31:13 -0800 Subject: [PATCH 2/2] grpc-js-core: rename streamClosed event to close --- packages/grpc-js-core/src/call-stream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/grpc-js-core/src/call-stream.ts b/packages/grpc-js-core/src/call-stream.ts index 0d90136d8..bff5b5d9a 100644 --- a/packages/grpc-js-core/src/call-stream.ts +++ b/packages/grpc-js-core/src/call-stream.ts @@ -294,7 +294,7 @@ export class Http2CallStream extends Duplex implements CallStream { this.unpushedReadMessages.push(null); } }); - stream.on('streamClosed', async (errorCode) => { + stream.on('close', async (errorCode) => { let code: Status; let details = ''; switch (errorCode) {