Skip to content

Commit

Permalink
Merge pull request #6 from feywind/otel-process-end
Browse files Browse the repository at this point in the history
fix: change processing span to use ack/nack instead of callback time
  • Loading branch information
feywind committed Apr 4, 2024
2 parents 52a640c + 4c384bc commit 1b19c73
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 182 deletions.
1 change: 1 addition & 0 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ export class LeaseManager extends EventEmitter {
if (this._subscriber.isOpen) {
message.subSpans.flowEnd();
process.nextTick(() => {
message.subSpans.processingStart(this._subscriber.name);
this._subscriber.emit('message', message);
});
}
Expand Down
4 changes: 4 additions & 0 deletions src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ export class Message implements tracing.MessageWithAttributes {
if (!this._handled) {
this._handled = true;
this.subSpans.ackCall();
this.subSpans.processingEnd();
this._subscriber.ack(this);
}
}
Expand Down Expand Up @@ -431,6 +432,7 @@ export class Message implements tracing.MessageWithAttributes {
if (!this._handled) {
this._handled = true;
this.subSpans.ackCall();
this.subSpans.processingEnd();
try {
return await this._subscriber.ackWithResponse(this);
} catch (e) {
Expand Down Expand Up @@ -501,6 +503,7 @@ export class Message implements tracing.MessageWithAttributes {
if (!this._handled) {
this._handled = true;
this.subSpans.nackCall();
this.subSpans.processingEnd();
this._subscriber.nack(this);
}
}
Expand Down Expand Up @@ -530,6 +533,7 @@ export class Message implements tracing.MessageWithAttributes {
if (!this._handled) {
this._handled = true;
this.subSpans.nackCall();
this.subSpans.processingEnd();
try {
return await this._subscriber.nackWithResponse(this);
} catch (e) {
Expand Down
39 changes: 2 additions & 37 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@ import {Topic} from './topic';
import {promisifySome} from './util';
import {StatusError} from './message-stream';
import {DebugMessage} from './debug';
import {EventEmitter} from 'stream';

export {AckError, AckResponse, AckResponses} from './subscriber';

import {EmitterCallback, WrappingEmitter} from './wrapping-emitter';

export type PushConfig = google.pubsub.v1.IPushConfig;
export type OidcToken = google.pubsub.v1.PushConfig.IOidcToken;

Expand Down Expand Up @@ -265,7 +264,7 @@ export declare interface Subscription {
* });
* ```
*/
export class Subscription extends WrappingEmitter {
export class Subscription extends EventEmitter {
// Note: WrappingEmitter is used here to wrap user processing callbacks.
// We do this to be able to build telemetry spans around them.
pubsub: PubSub;
Expand All @@ -279,8 +278,6 @@ export class Subscription extends WrappingEmitter {
constructor(pubsub: PubSub, name: string, options?: SubscriptionOptions) {
super();

this.setEmitterWrapper(this.listenerWrapper.bind(this));

options = options || {};

this.pubsub = pubsub;
Expand Down Expand Up @@ -343,38 +340,6 @@ export class Subscription extends WrappingEmitter {
return Subscription.formatName_(this.pubsub.projectId, this.id_);
}

/**
* This wrapper will be called as part of the emit() process. This lets
* us capture the full time span of processing even if the user is using
* async callbacks.
*
* @private
*/
private listenerWrapper(
eventName: string | symbol,
listener: EmitterCallback,
args: unknown[]
) {
if (eventName !== 'message') {
return listener(...args);
} else {
const message = args[0] as Message;
message.subSpans.processingStart(this.name);

// If the user returned a Promise, that means they used an async handler.
// In that case, we need to tag on to their Promise to end the span.
// Otherwise, the listener chain is sync, and we can close out sync.
const result = listener(...args) as unknown as Promise<void>;
if (result && typeof result.then === 'function') {
result.then(() => {
message.subSpans.processingEnd();
});
} else {
message.subSpans.processingEnd();
}
}
}

/**
* Indicates if the Subscription is open and receiving messages.
*
Expand Down
145 changes: 0 additions & 145 deletions src/wrapping-emitter.ts

This file was deleted.

0 comments on commit 1b19c73

Please sign in to comment.