Skip to content

Commit

Permalink
grpc-js-core: handle eos headers as trailers
Browse files Browse the repository at this point in the history
  • Loading branch information
kjin committed Dec 20, 2017
1 parent 7b1ab1c commit ef616c3
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 54 deletions.
136 changes: 84 additions & 52 deletions packages/grpc-js-core/src/call-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ export class Http2CallStream extends Duplex implements CallStream {
// Status code mapped from :status. To be used if grpc-status is not received
private mappedStatusCode: Status = Status.UNKNOWN;

// Promise objects that are re-assigned to resolving promises when headers
// or trailers received. Processing headers/trailers is asynchronous, so we
// can use these objects to await their completion. This helps us establish
// order of precedence when obtaining the status of the call.
private handlingHeaders = Promise.resolve();
private handlingTrailers = Promise.resolve();

// This is populated (non-null) if and only if the call has ended
private finalStatus: StatusObject|null = null;

Expand All @@ -116,6 +123,11 @@ export class Http2CallStream extends Duplex implements CallStream {
this.filterStack = filterStackFactory.createFilter(this);
}

/**
* On first call, emits a 'status' event with the given StatusObject.
* Subsequent calls are no-ops.
* @param status The status of the call.
*/
private endCall(status: StatusObject): void {
if (this.finalStatus === null) {
this.finalStatus = status;
Expand All @@ -135,12 +147,46 @@ export class Http2CallStream extends Duplex implements CallStream {
return canPush;
}

private handleTrailers(headers: http2.IncomingHttpHeaders) {
let code: Status = this.mappedStatusCode;
let details = '';
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
metadata = new Metadata();
}
let status: StatusObject = {code, details, metadata};
this.handlingTrailers = (async () => {
let finalStatus;
try {
// Attempt to assign final status.
finalStatus = await this.filterStack.receiveTrailers(Promise.resolve(status));
} catch (error) {
await this.handlingHeaders;
// This is a no-op if the call was already ended when handling headers.
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata()
});
return;
}
// It's possible that headers were received but not fully handled yet.
// Give the headers handler an opportunity to end the call first,
// if an error occurred.
await this.handlingHeaders;
// This is a no-op if the call was already ended when handling headers.
this.endCall(finalStatus);
})();
}

attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
if (this.finalStatus !== null) {
stream.rstWithCancel();
} else {
this.http2Stream = stream;
stream.on('response', (headers) => {
stream.on('response', (headers, flags) => {
switch (headers[HTTP2_HEADER_STATUS]) {
// TODO(murgatroid99): handle 100 and 101
case '400':
Expand All @@ -166,57 +212,27 @@ export class Http2CallStream extends Duplex implements CallStream {
}
delete headers[HTTP2_HEADER_STATUS];
delete headers[HTTP2_HEADER_CONTENT_TYPE];
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
this.cancelWithStatus(Status.UNKNOWN, e.message);
return;
}
this.filterStack.receiveMetadata(Promise.resolve(metadata))
.then(
(finalMetadata) => {
this.emit('metadata', finalMetadata);
},
(error) => {
this.cancelWithStatus(Status.UNKNOWN, error.message);
});
});
stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
let code: Status = this.mappedStatusCode;
let details = '';
if (typeof headers['grpc-status'] === 'string') {
let receivedCode = Number(headers['grpc-status']);
if (receivedCode in Status) {
code = receivedCode;
} else {
code = Status.UNKNOWN;
if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
this.handleTrailers(headers);
} else {
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (error) {
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
return;
}
delete headers['grpc-status'];
}
if (typeof headers['grpc-message'] === 'string') {
details = decodeURI(headers['grpc-message'] as string);
this.handlingHeaders =
this.filterStack.receiveMetadata(Promise.resolve(metadata))
.then((finalMetadata) => {
this.emit('metadata', finalMetadata);
}).catch((error) => {
this.destroyHttp2Stream();
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
});
}
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
metadata = new Metadata();
}
let status: StatusObject = {code, details, metadata};
this.filterStack.receiveTrailers(Promise.resolve(status))
.then(
(finalStatus) => {
this.endCall(finalStatus);
},
(error) => {
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata()
});
});
});
stream.on('trailers', this.handleTrailers.bind(this));
stream.on('data', (data) => {
let readHead = 0;
let canPush = true;
Expand Down Expand Up @@ -278,7 +294,7 @@ export class Http2CallStream extends Duplex implements CallStream {
this.unpushedReadMessages.push(null);
}
});
stream.on('streamClosed', (errorCode) => {
stream.on('streamClosed', async (errorCode) => {
let code: Status;
let details = '';
switch (errorCode) {
Expand All @@ -299,6 +315,13 @@ export class Http2CallStream extends Duplex implements CallStream {
default:
code = Status.INTERNAL;
}
// This guarantees that if trailers were received, the value of the
// 'grpc-status' header takes precedence for emitted status data.
await this.handlingTrailers;
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({code: code, details: details, metadata: new Metadata()});
});
stream.on('error', (err: Error) => {
Expand All @@ -323,8 +346,7 @@ export class Http2CallStream extends Duplex implements CallStream {
}
}

cancelWithStatus(status: Status, details: string): void {
this.endCall({code: status, details: details, metadata: new Metadata()});
private destroyHttp2Stream() {
// The http2 stream could already have been destroyed if cancelWithStatus
// is called in response to an internal http2 error.
if (this.http2Stream !== null && !this.http2Stream.destroyed) {
Expand All @@ -334,6 +356,16 @@ export class Http2CallStream extends Duplex implements CallStream {
}
}

cancelWithStatus(status: Status, details: string): void {
this.destroyHttp2Stream();
(async () => {
// If trailers are currently being processed, the call should be ended
// by handleTrailers instead.
await this.handlingTrailers;
this.endCall({code: status, details: details, metadata: new Metadata()});
})()
}

getDeadline(): Deadline {
return this.options.deadline;
}
Expand Down
5 changes: 4 additions & 1 deletion packages/grpc-js-core/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {Status} from './constants';
import {DeadlineFilterFactory} from './deadline-filter';
import {FilterStackFactory} from './filter-stack';
import {Metadata, MetadataObject} from './metadata';
import { MetadataStatusFilterFactory } from './metadata-status-filter';

const IDLE_TIMEOUT_MS = 300000;

Expand Down Expand Up @@ -177,7 +178,9 @@ export class Http2Channel extends EventEmitter implements Channel {
}
this.filterStackFactory = new FilterStackFactory([
new CompressionFilterFactory(this),
new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this)
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this),
new MetadataStatusFilterFactory(this)
]);
this.currentBackoffDeadline = new Date();
/* The only purpose of these lines is to ensure that this.backoffTimerId has
Expand Down
36 changes: 36 additions & 0 deletions packages/grpc-js-core/src/metadata-status-filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import {CallStream} from './call-stream';
import {Channel} from './channel';
import {BaseFilter, Filter, FilterFactory} from './filter';
import {StatusObject} from './call-stream';
import {Status} from './constants';

export class MetadataStatusFilter extends BaseFilter implements Filter {
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
let { code, details, metadata } = await status;
if (code !== Status.UNKNOWN) {
// we already have a known status, so don't assign a new one.
return { code, details, metadata };
}
const metadataMap = metadata.getMap();
if (typeof metadataMap['grpc-status'] === 'string') {
let receivedCode = Number(metadataMap['grpc-status']);
if (receivedCode in Status) {
code = receivedCode;
}
metadata.remove('grpc-status');
}
if (typeof metadataMap['grpc-message'] === 'string') {
details = decodeURI(metadataMap['grpc-message'] as string);
metadata.remove('grpc-message');
}
return { code, details, metadata };
}
}

export class MetadataStatusFilterFactory implements
FilterFactory<MetadataStatusFilter> {
constructor(private readonly channel: Channel) {}
createFilter(callStream: CallStream): MetadataStatusFilter {
return new MetadataStatusFilter();
}
}
2 changes: 1 addition & 1 deletion packages/grpc-js-core/src/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ function isLegalKey(key: string): boolean {
}

function isLegalNonBinaryValue(value: string): boolean {
return !!value.match(/^[ -~]+$/);
return !!value.match(/^[ -~]*$/);
}

function isBinaryKey(key: string): boolean {
Expand Down

0 comments on commit ef616c3

Please sign in to comment.