Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset event stream unit test #175

Merged
merged 16 commits into from
Oct 31, 2021
Merged
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "wechaty-puppet-service",
"version": "1.0.5",
"version": "1.0.6",
"description": "Puppet Service for Wechaty",
"type": "module",
"exports": {
Expand Down
51 changes: 31 additions & 20 deletions src/client/grpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ WechatyResolver.setup()

class GrpcClient extends EventEmitter {

#client? : puppet.PuppetClient
get client () : puppet.PuppetClient { return this.#client! }
protected _client? : puppet.PuppetClient
get client () : puppet.PuppetClient { return this._client! }

eventStream? : grpc.ClientReadableStream<puppet.EventResponse>

/**
* gRPC settings
*/
endpoint : string
caCert : Buffer
disableTls : boolean
serverName : string
caCert : Buffer
token : WechatyToken
endpoint : string
serverName : string
token : WechatyToken

constructor (private options: PuppetServiceOptions) {
super()
Expand Down Expand Up @@ -142,19 +142,28 @@ class GrpcClient extends EventEmitter {
/**
* 1. Disconnect from stream
*/
await this.stopStream()
this.stopStream()

/**
* 2. Stop the puppet
*/
await util.promisify(
this.client.stop
.bind(this.client),
)(new puppet.StopRequest())
try {
await util.promisify(
this.client.stop
.bind(this.client),
)(new puppet.StopRequest())
} catch (e) {
this.emit('error', e)
}

/**
* 3. Destroy grpc client
*/
await this.destroyClient()
try {
this.destroyClient()
} catch (e) {
this.emit('error', e)
}
}

protected async initClient (): Promise<void> {
Expand Down Expand Up @@ -196,12 +205,12 @@ class GrpcClient extends EventEmitter {
clientOptions['grpc.default_authority'] = grpcDefaultAuthority
}

if (this.#client) {
if (this._client) {
log.warn('GrpcClient', 'initClient() this.#client exists? Old client has been dropped.')
this.#client = undefined
this._client = undefined
}

this.#client = new puppet.PuppetClient(
this._client = new puppet.PuppetClient(
this.endpoint,
credential,
clientOptions,
Expand All @@ -211,17 +220,17 @@ class GrpcClient extends EventEmitter {
protected destroyClient (): void {
log.verbose('GrpcClient', 'destroyClient()')

if (!this.#client) {
if (!this._client) {
log.warn('GrpcClient', 'destroyClient() this.#client not exist')
return
}

const client = this.#client
const client = this._client
/**
* Huan(202108): we should set `this.client` to `undefined` at the current event loop
* to prevent the future usage of the old client.
*/
this.#client = undefined
this._client = undefined

try {
client.close()
Expand Down Expand Up @@ -303,7 +312,7 @@ class GrpcClient extends EventEmitter {

/**
* Huan(202108): the `heartbeat` event is not guaranteed to be emitted
* if a puppet service provider is coming from the community, and it does not follow the protocol.
* if a puppet service provider is coming from the community, it might not follow the protocol specification.
* So we need a timeout for compatible with those providers
*/
const TIMEOUT = 5 * 1000 // 5 seconds
Expand Down Expand Up @@ -346,7 +355,7 @@ class GrpcClient extends EventEmitter {
log.verbose('GrpcClient', 'stopStream()')

if (!this.eventStream) {
log.verbose('GrpcClient', 'no eventStream when stop, skip destroy.')
log.verbose('GrpcClient', 'stopStream() no eventStream when stop, skip destroy.')
return
}
/**
Expand All @@ -363,7 +372,9 @@ class GrpcClient extends EventEmitter {
*/
// this.eventStream.cancel()

log.verbose('GrpcClient', 'stopStream() eventStream destroying ...')
eventStream.destroy()
log.verbose('GrpcClient', 'stopStream() eventStream destroyed')
}

}
Expand Down
29 changes: 13 additions & 16 deletions src/client/puppet-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,16 @@ export class PuppetService extends PUPPET.Puppet {
log.verbose('PuppetService', 'onStop()')

if (this.recoverSubscription) {
this.recoverSubscription.unsubscribe()
const recoverSubscription = this.recoverSubscription
this.recoverSubscription = undefined
recoverSubscription.unsubscribe()
}

await this._grpc?.stop()
this._grpc = undefined
if (this._grpc) {
const grpc = this._grpc
this._grpc = undefined
await grpc.stop()
}
}

protected hookPayloadStore (): void {
Expand Down Expand Up @@ -180,14 +184,14 @@ export class PuppetService extends PUPPET.Puppet {
log.verbose('PuppetService', 'bridgeGrpcEventStream() eventStream.on(end)')
})
.on('error', (e: unknown) => {
this.emit('error', e)
// https://github.com/wechaty/wechaty-puppet-service/issues/16
log.verbose('PuppetService', 'bridgeGrpcEventStream() eventStream.on(error) %s', e)
const reason = 'bridgeGrpcEventStream() eventStream.on(error) ' + e
// log.verbose('PuppetService', 'bridgeGrpcEventStream() eventStream.on(error) %s', e)
// const reason = 'bridgeGrpcEventStream() eventStream.on(error) ' + e
/**
* Huan(202110): simple reset puppet when grpc client has error? (or not?)
*/
this.emit('error', new Error(reason))
this.wrapAsync(this.reset())
// this.wrapAsync(this.reset())
// /**
// * The `Puppet` class have a throttleQueue for receiving the `reset` events
// * and it's the `Puppet` class's duty for call the `puppet.reset()` to reset the puppet.
Expand Down Expand Up @@ -303,13 +307,7 @@ export class PuppetService extends PUPPET.Puppet {
override async logout (reason = 'logout()'): Promise<void> {
log.verbose('PuppetService', 'logout("%s")', reason)

if (!this.logonoff()) {
log.verbose('PuppetService', 'logout("%s") puppet does not logged in, do nothing. %s',
reason,
new Error().stack,
)
return
}
super.logout(reason)

try {
await util.promisify(
Expand All @@ -318,8 +316,7 @@ export class PuppetService extends PUPPET.Puppet {
)(new grpcPuppet.LogoutRequest())

} catch (e) {
log.error('PuppetService', 'logout() rejection: %s', e && (e as Error).message)
throw e
this.emit('error', e)
}
}

Expand Down
55 changes: 41 additions & 14 deletions src/server/puppet-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,35 +76,47 @@ export class PuppetServer {
}

if (!this.urnRegistry) {
log.verbose('PuppetServer', 'start() initializing FileBox UUID URN Registry ...')
this.urnRegistry = new UniformResourceNameRegistry()
await this.urnRegistry.init()
log.verbose('PuppetServer', 'start() initializing FileBox UUID URN Registry ... done')
}

this.grpcServer = new grpc.Server(GRPC_OPTIONS)

/**
* Connect FileBox with UUID Manager
*/
const FileBoxUuid = uuidifyFileBoxLocal(this.urnRegistry)

log.verbose('PuppetServer', 'start() initializing gRPC Server with options "%s"', JSON.stringify(GRPC_OPTIONS))
this.grpcServer = new grpc.Server(GRPC_OPTIONS)
log.verbose('PuppetServer', 'start() initializing gRPC Server ... done', JSON.stringify(GRPC_OPTIONS))

log.verbose('PuppetServer', 'start() initializing puppet implementation with FileBoxUuid...')
const puppetImpl = puppetImplementation(
this.options.puppet,
FileBoxUuid,
)
log.verbose('PuppetServer', 'start() initializing puppet implementation with FileBoxUuid... done')

log.verbose('PuppetServer', 'start() initializing authorization with token ...')
const puppetImplAuth = authImplToken(this.options.token)(puppetImpl)
this.grpcServer.addService(
grpcPuppet.PuppetService,
puppetImplAuth,
)
log.verbose('PuppetServer', 'start() initializing authorization with token ... done')

log.verbose('PuppetServer', 'start() initializing gRPC health service ...')
const healthImpl = healthImplementation(
this.options.puppet,
)
this.grpcServer.addService(
grpcGoogle.HealthService,
healthImpl,
)
log.verbose('PuppetServer', 'start() initializing gRPC health service ... done')

log.verbose('PuppetServer', 'start() initializing TLS CA ...')
const caCerts = envVars.WECHATY_PUPPET_SERVICE_TLS_CA_CERT()
const caCertBuf = caCerts
? Buffer.from(caCerts)
Expand All @@ -118,6 +130,7 @@ export class PuppetServer {
envVars.WECHATY_PUPPET_SERVICE_TLS_SERVER_KEY(this.options.tls?.serverKey)
|| TLS_INSECURE_SERVER_KEY,
)
log.verbose('PuppetServer', 'start() initializing TLS CA ... done')

const keyCertPairs: grpc.KeyCertPair[] = [{
cert_chain : certChain,
Expand All @@ -129,6 +142,7 @@ export class PuppetServer {
* we introduced the WECHATY_PUPPET_SERVICE_NO_TLS_INSECURE_{SERVER,CLIENT} environment variables.
* if it has been set, then we will run under HTTP instead of HTTPS
*/
log.verbose('PuppetServer', 'start() initializing gRPC server credentials ...')
let credential
if (envVars.WECHATY_PUPPET_SERVICE_NO_TLS_INSECURE_SERVER(this.options.tls?.disable)) {
log.warn('PuppetServer', 'start() TLS disabled: INSECURE!')
Expand All @@ -137,10 +151,12 @@ export class PuppetServer {
log.verbose('PuppetServer', 'start() TLS enabled.')
credential = grpc.ServerCredentials.createSsl(caCertBuf, keyCertPairs)
}
log.verbose('PuppetServer', 'start() initializing gRPC server credentials ... done')

/***
* Start Grpc Server
*/
log.verbose('PuppetServer', 'start() gRPC server starting ...')
const port = await util.promisify(
this.grpcServer.bindAsync
.bind(this.grpcServer),
Expand All @@ -154,29 +170,40 @@ export class PuppetServer {
}

this.grpcServer.start()
log.verbose('PuppetServer', 'start() gRPC server starting ... done')
}

public async stop (): Promise<void> {
log.verbose('PuppetServer', 'stop()')

if (!this.grpcServer) {
throw new Error('no grpc server')
}

await util.promisify(
this.grpcServer.tryShutdown
.bind(this.grpcServer),
)()

const grpcServer = this.grpcServer
setImmediate(() => grpcServer.forceShutdown())
if (this.grpcServer) {
const grpcServer = this.grpcServer
this.grpcServer = undefined

log.verbose('PuppetServer', 'stop() shuting down gRPC server ...')
await util.promisify(
grpcServer.tryShutdown
.bind(grpcServer),
)()
log.verbose('PuppetServer', 'stop() shuting down gRPC server ... done')

try {
grpcServer.forceShutdown()
} catch (e) {
log.warn('PuppetServer', 'stop() grpcServer.forceShutdown() rejection: %s', (e as Error).message)
}

this.grpcServer = undefined
} else {
log.warn('PuppetServer', 'stop() no grpcServer exist')
}

if (this.urnRegistry) {
log.verbose('PuppetServer', 'stop() destory URN Registry ...')
await this.urnRegistry.destroy()
this.urnRegistry = undefined
log.verbose('PuppetServer', 'stop() destory URN Registry ... done')
}

}

}
Loading