Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc-js-core: prevent callback before 'end' event and handle eos headers as trailers #132

Merged
merged 2 commits into from
Jan 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 84 additions & 52 deletions packages/grpc-js-core/src/call-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ export class Http2CallStream extends Duplex implements CallStream {
// Status code mapped from :status. To be used if grpc-status is not received
private mappedStatusCode: Status = Status.UNKNOWN;

// Promise objects that are re-assigned to resolving promises when headers
// or trailers received. Processing headers/trailers is asynchronous, so we
// can use these objects to await their completion. This helps us establish
// order of precedence when obtaining the status of the call.
private handlingHeaders = Promise.resolve();
private handlingTrailers = Promise.resolve();

// This is populated (non-null) if and only if the call has ended
private finalStatus: StatusObject|null = null;

Expand All @@ -116,6 +123,11 @@ export class Http2CallStream extends Duplex implements CallStream {
this.filterStack = filterStackFactory.createFilter(this);
}

/**
* On first call, emits a 'status' event with the given StatusObject.
* Subsequent calls are no-ops.
* @param status The status of the call.
*/
private endCall(status: StatusObject): void {
if (this.finalStatus === null) {
this.finalStatus = status;
Expand All @@ -135,12 +147,46 @@ export class Http2CallStream extends Duplex implements CallStream {
return canPush;
}

private handleTrailers(headers: http2.IncomingHttpHeaders) {
let code: Status = this.mappedStatusCode;
let details = '';
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
metadata = new Metadata();
}
let status: StatusObject = {code, details, metadata};
this.handlingTrailers = (async () => {
let finalStatus;
try {
// Attempt to assign final status.
finalStatus = await this.filterStack.receiveTrailers(Promise.resolve(status));
} catch (error) {
await this.handlingHeaders;
// This is a no-op if the call was already ended when handling headers.
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata()
});
return;
}
// It's possible that headers were received but not fully handled yet.
// Give the headers handler an opportunity to end the call first,
// if an error occurred.
await this.handlingHeaders;
// This is a no-op if the call was already ended when handling headers.
this.endCall(finalStatus);
})();
}

attachHttp2Stream(stream: http2.ClientHttp2Stream): void {
if (this.finalStatus !== null) {
stream.rstWithCancel();
} else {
this.http2Stream = stream;
stream.on('response', (headers) => {
stream.on('response', (headers, flags) => {
switch (headers[HTTP2_HEADER_STATUS]) {
// TODO(murgatroid99): handle 100 and 101
case '400':
Expand All @@ -166,57 +212,27 @@ export class Http2CallStream extends Duplex implements CallStream {
}
delete headers[HTTP2_HEADER_STATUS];
delete headers[HTTP2_HEADER_CONTENT_TYPE];
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
this.cancelWithStatus(Status.UNKNOWN, e.message);
return;
}
this.filterStack.receiveMetadata(Promise.resolve(metadata))
.then(
(finalMetadata) => {
this.emit('metadata', finalMetadata);
},
(error) => {
this.cancelWithStatus(Status.UNKNOWN, error.message);
});
});
stream.on('trailers', (headers: http2.IncomingHttpHeaders) => {
let code: Status = this.mappedStatusCode;
let details = '';
if (typeof headers['grpc-status'] === 'string') {
let receivedCode = Number(headers['grpc-status']);
if (receivedCode in Status) {
code = receivedCode;
} else {
code = Status.UNKNOWN;
if (flags & http2.constants.NGHTTP2_FLAG_END_STREAM) {
this.handleTrailers(headers);
} else {
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (error) {
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
return;
}
delete headers['grpc-status'];
}
if (typeof headers['grpc-message'] === 'string') {
details = decodeURI(headers['grpc-message'] as string);
this.handlingHeaders =
this.filterStack.receiveMetadata(Promise.resolve(metadata))
.then((finalMetadata) => {
this.emit('metadata', finalMetadata);
}).catch((error) => {
this.destroyHttp2Stream();
this.endCall({code: Status.UNKNOWN, details: error.message, metadata: new Metadata()});
});
}
let metadata: Metadata;
try {
metadata = Metadata.fromHttp2Headers(headers);
} catch (e) {
metadata = new Metadata();
}
let status: StatusObject = {code, details, metadata};
this.filterStack.receiveTrailers(Promise.resolve(status))
.then(
(finalStatus) => {
this.endCall(finalStatus);
},
(error) => {
this.endCall({
code: Status.INTERNAL,
details: 'Failed to process received status',
metadata: new Metadata()
});
});
});
stream.on('trailers', this.handleTrailers.bind(this));
stream.on('data', (data) => {
let readHead = 0;
let canPush = true;
Expand Down Expand Up @@ -278,7 +294,7 @@ export class Http2CallStream extends Duplex implements CallStream {
this.unpushedReadMessages.push(null);
}
});
stream.on('streamClosed', (errorCode) => {
stream.on('close', async (errorCode) => {
let code: Status;
let details = '';
switch (errorCode) {
Expand All @@ -299,6 +315,13 @@ export class Http2CallStream extends Duplex implements CallStream {
default:
code = Status.INTERNAL;
}
// This guarantees that if trailers were received, the value of the
// 'grpc-status' header takes precedence for emitted status data.
await this.handlingTrailers;
// This is a no-op if trailers were received at all.
// This is OK, because status codes emitted here correspond to more
// catastrophic issues that prevent us from receiving trailers in the
// first place.
this.endCall({code: code, details: details, metadata: new Metadata()});
});
stream.on('error', (err: Error) => {
Expand All @@ -323,8 +346,7 @@ export class Http2CallStream extends Duplex implements CallStream {
}
}

cancelWithStatus(status: Status, details: string): void {
this.endCall({code: status, details: details, metadata: new Metadata()});
private destroyHttp2Stream() {
// The http2 stream could already have been destroyed if cancelWithStatus
// is called in response to an internal http2 error.
if (this.http2Stream !== null && !this.http2Stream.destroyed) {
Expand All @@ -334,6 +356,16 @@ export class Http2CallStream extends Duplex implements CallStream {
}
}

cancelWithStatus(status: Status, details: string): void {
this.destroyHttp2Stream();
(async () => {
// If trailers are currently being processed, the call should be ended
// by handleTrailers instead.
await this.handlingTrailers;
this.endCall({code: status, details: details, metadata: new Metadata()});
})();
}

getDeadline(): Deadline {
return this.options.deadline;
}
Expand Down
5 changes: 4 additions & 1 deletion packages/grpc-js-core/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {Status} from './constants';
import {DeadlineFilterFactory} from './deadline-filter';
import {FilterStackFactory} from './filter-stack';
import {Metadata, MetadataObject} from './metadata';
import { MetadataStatusFilterFactory } from './metadata-status-filter';

const IDLE_TIMEOUT_MS = 300000;

Expand Down Expand Up @@ -177,7 +178,9 @@ export class Http2Channel extends EventEmitter implements Channel {
}
this.filterStackFactory = new FilterStackFactory([
new CompressionFilterFactory(this),
new CallCredentialsFilterFactory(this), new DeadlineFilterFactory(this)
new CallCredentialsFilterFactory(this),
new DeadlineFilterFactory(this),
new MetadataStatusFilterFactory(this)
]);
this.currentBackoffDeadline = new Date();
/* The only purpose of these lines is to ensure that this.backoffTimerId has
Expand Down
36 changes: 36 additions & 0 deletions packages/grpc-js-core/src/metadata-status-filter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import {CallStream} from './call-stream';
import {Channel} from './channel';
import {BaseFilter, Filter, FilterFactory} from './filter';
import {StatusObject} from './call-stream';
import {Status} from './constants';

export class MetadataStatusFilter extends BaseFilter implements Filter {
async receiveTrailers(status: Promise<StatusObject>): Promise<StatusObject> {
let { code, details, metadata } = await status;
if (code !== Status.UNKNOWN) {
// we already have a known status, so don't assign a new one.
return { code, details, metadata };
}
const metadataMap = metadata.getMap();
if (typeof metadataMap['grpc-status'] === 'string') {
let receivedCode = Number(metadataMap['grpc-status']);
if (receivedCode in Status) {
code = receivedCode;
}
metadata.remove('grpc-status');
}
if (typeof metadataMap['grpc-message'] === 'string') {
details = decodeURI(metadataMap['grpc-message'] as string);
metadata.remove('grpc-message');
}
return { code, details, metadata };
}
}

export class MetadataStatusFilterFactory implements
FilterFactory<MetadataStatusFilter> {
constructor(private readonly channel: Channel) {}
createFilter(callStream: CallStream): MetadataStatusFilter {
return new MetadataStatusFilter();
}
}
2 changes: 1 addition & 1 deletion packages/grpc-js-core/src/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ function isLegalKey(key: string): boolean {
}

function isLegalNonBinaryValue(value: string): boolean {
return !!value.match(/^[ -~]+$/);
return !!value.match(/^[ -~]*$/);
}

function isBinaryKey(key: string): boolean {
Expand Down