diff --git a/package.json b/package.json index f19c192c..40075d04 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "wechaty-puppet-service", - "version": "1.0.5", + "version": "1.0.6", "description": "Puppet Service for Wechaty", "type": "module", "exports": { diff --git a/src/client/grpc-client.ts b/src/client/grpc-client.ts index f913619f..2bffa133 100644 --- a/src/client/grpc-client.ts +++ b/src/client/grpc-client.ts @@ -36,19 +36,19 @@ WechatyResolver.setup() class GrpcClient extends EventEmitter { - #client? : puppet.PuppetClient - get client () : puppet.PuppetClient { return this.#client! } + protected _client? : puppet.PuppetClient + get client () : puppet.PuppetClient { return this._client! } eventStream? : grpc.ClientReadableStream /** * gRPC settings */ - endpoint : string + caCert : Buffer disableTls : boolean - serverName : string - caCert : Buffer - token : WechatyToken + endpoint : string + serverName : string + token : WechatyToken constructor (private options: PuppetServiceOptions) { super() @@ -142,19 +142,28 @@ class GrpcClient extends EventEmitter { /** * 1. Disconnect from stream */ - await this.stopStream() + this.stopStream() /** * 2. Stop the puppet */ - await util.promisify( - this.client.stop - .bind(this.client), - )(new puppet.StopRequest()) + try { + await util.promisify( + this.client.stop + .bind(this.client), + )(new puppet.StopRequest()) + } catch (e) { + this.emit('error', e) + } + /** * 3. Destroy grpc client */ - await this.destroyClient() + try { + this.destroyClient() + } catch (e) { + this.emit('error', e) + } } protected async initClient (): Promise { @@ -196,12 +205,12 @@ class GrpcClient extends EventEmitter { clientOptions['grpc.default_authority'] = grpcDefaultAuthority } - if (this.#client) { + if (this._client) { log.warn('GrpcClient', 'initClient() this.#client exists? Old client has been dropped.') - this.#client = undefined + this._client = undefined } - this.#client = new puppet.PuppetClient( + this._client = new puppet.PuppetClient( this.endpoint, credential, clientOptions, @@ -211,17 +220,17 @@ class GrpcClient extends EventEmitter { protected destroyClient (): void { log.verbose('GrpcClient', 'destroyClient()') - if (!this.#client) { + if (!this._client) { log.warn('GrpcClient', 'destroyClient() this.#client not exist') return } - const client = this.#client + const client = this._client /** * Huan(202108): we should set `this.client` to `undefined` at the current event loop * to prevent the future usage of the old client. */ - this.#client = undefined + this._client = undefined try { client.close() @@ -303,7 +312,7 @@ class GrpcClient extends EventEmitter { /** * Huan(202108): the `heartbeat` event is not guaranteed to be emitted - * if a puppet service provider is coming from the community, and it does not follow the protocol. + * if a puppet service provider is coming from the community, it might not follow the protocol specification. * So we need a timeout for compatible with those providers */ const TIMEOUT = 5 * 1000 // 5 seconds @@ -346,7 +355,7 @@ class GrpcClient extends EventEmitter { log.verbose('GrpcClient', 'stopStream()') if (!this.eventStream) { - log.verbose('GrpcClient', 'no eventStream when stop, skip destroy.') + log.verbose('GrpcClient', 'stopStream() no eventStream when stop, skip destroy.') return } /** @@ -363,7 +372,9 @@ class GrpcClient extends EventEmitter { */ // this.eventStream.cancel() + log.verbose('GrpcClient', 'stopStream() eventStream destroying ...') eventStream.destroy() + log.verbose('GrpcClient', 'stopStream() eventStream destroyed') } } diff --git a/src/client/puppet-service.ts b/src/client/puppet-service.ts index 80529ff1..fcc2bf9d 100644 --- a/src/client/puppet-service.ts +++ b/src/client/puppet-service.ts @@ -141,12 +141,16 @@ export class PuppetService extends PUPPET.Puppet { log.verbose('PuppetService', 'onStop()') if (this.recoverSubscription) { - this.recoverSubscription.unsubscribe() + const recoverSubscription = this.recoverSubscription this.recoverSubscription = undefined + recoverSubscription.unsubscribe() } - await this._grpc?.stop() - this._grpc = undefined + if (this._grpc) { + const grpc = this._grpc + this._grpc = undefined + await grpc.stop() + } } protected hookPayloadStore (): void { @@ -180,14 +184,14 @@ export class PuppetService extends PUPPET.Puppet { log.verbose('PuppetService', 'bridgeGrpcEventStream() eventStream.on(end)') }) .on('error', (e: unknown) => { + this.emit('error', e) // https://github.com/wechaty/wechaty-puppet-service/issues/16 - log.verbose('PuppetService', 'bridgeGrpcEventStream() eventStream.on(error) %s', e) - const reason = 'bridgeGrpcEventStream() eventStream.on(error) ' + e + // log.verbose('PuppetService', 'bridgeGrpcEventStream() eventStream.on(error) %s', e) + // const reason = 'bridgeGrpcEventStream() eventStream.on(error) ' + e /** * Huan(202110): simple reset puppet when grpc client has error? (or not?) */ - this.emit('error', new Error(reason)) - this.wrapAsync(this.reset()) + // this.wrapAsync(this.reset()) // /** // * The `Puppet` class have a throttleQueue for receiving the `reset` events // * and it's the `Puppet` class's duty for call the `puppet.reset()` to reset the puppet. @@ -303,13 +307,7 @@ export class PuppetService extends PUPPET.Puppet { override async logout (reason = 'logout()'): Promise { log.verbose('PuppetService', 'logout("%s")', reason) - if (!this.logonoff()) { - log.verbose('PuppetService', 'logout("%s") puppet does not logged in, do nothing. %s', - reason, - new Error().stack, - ) - return - } + super.logout(reason) try { await util.promisify( @@ -318,8 +316,7 @@ export class PuppetService extends PUPPET.Puppet { )(new grpcPuppet.LogoutRequest()) } catch (e) { - log.error('PuppetService', 'logout() rejection: %s', e && (e as Error).message) - throw e + this.emit('error', e) } } diff --git a/src/server/puppet-server.ts b/src/server/puppet-server.ts index 9b6ecb86..fccdbcd8 100644 --- a/src/server/puppet-server.ts +++ b/src/server/puppet-server.ts @@ -76,27 +76,37 @@ export class PuppetServer { } if (!this.urnRegistry) { + log.verbose('PuppetServer', 'start() initializing FileBox UUID URN Registry ...') this.urnRegistry = new UniformResourceNameRegistry() await this.urnRegistry.init() + log.verbose('PuppetServer', 'start() initializing FileBox UUID URN Registry ... done') } - this.grpcServer = new grpc.Server(GRPC_OPTIONS) - /** * Connect FileBox with UUID Manager */ const FileBoxUuid = uuidifyFileBoxLocal(this.urnRegistry) + log.verbose('PuppetServer', 'start() initializing gRPC Server with options "%s"', JSON.stringify(GRPC_OPTIONS)) + this.grpcServer = new grpc.Server(GRPC_OPTIONS) + log.verbose('PuppetServer', 'start() initializing gRPC Server ... done', JSON.stringify(GRPC_OPTIONS)) + + log.verbose('PuppetServer', 'start() initializing puppet implementation with FileBoxUuid...') const puppetImpl = puppetImplementation( this.options.puppet, FileBoxUuid, ) + log.verbose('PuppetServer', 'start() initializing puppet implementation with FileBoxUuid... done') + + log.verbose('PuppetServer', 'start() initializing authorization with token ...') const puppetImplAuth = authImplToken(this.options.token)(puppetImpl) this.grpcServer.addService( grpcPuppet.PuppetService, puppetImplAuth, ) + log.verbose('PuppetServer', 'start() initializing authorization with token ... done') + log.verbose('PuppetServer', 'start() initializing gRPC health service ...') const healthImpl = healthImplementation( this.options.puppet, ) @@ -104,7 +114,9 @@ export class PuppetServer { grpcGoogle.HealthService, healthImpl, ) + log.verbose('PuppetServer', 'start() initializing gRPC health service ... done') + log.verbose('PuppetServer', 'start() initializing TLS CA ...') const caCerts = envVars.WECHATY_PUPPET_SERVICE_TLS_CA_CERT() const caCertBuf = caCerts ? Buffer.from(caCerts) @@ -118,6 +130,7 @@ export class PuppetServer { envVars.WECHATY_PUPPET_SERVICE_TLS_SERVER_KEY(this.options.tls?.serverKey) || TLS_INSECURE_SERVER_KEY, ) + log.verbose('PuppetServer', 'start() initializing TLS CA ... done') const keyCertPairs: grpc.KeyCertPair[] = [{ cert_chain : certChain, @@ -129,6 +142,7 @@ export class PuppetServer { * we introduced the WECHATY_PUPPET_SERVICE_NO_TLS_INSECURE_{SERVER,CLIENT} environment variables. * if it has been set, then we will run under HTTP instead of HTTPS */ + log.verbose('PuppetServer', 'start() initializing gRPC server credentials ...') let credential if (envVars.WECHATY_PUPPET_SERVICE_NO_TLS_INSECURE_SERVER(this.options.tls?.disable)) { log.warn('PuppetServer', 'start() TLS disabled: INSECURE!') @@ -137,10 +151,12 @@ export class PuppetServer { log.verbose('PuppetServer', 'start() TLS enabled.') credential = grpc.ServerCredentials.createSsl(caCertBuf, keyCertPairs) } + log.verbose('PuppetServer', 'start() initializing gRPC server credentials ... done') /*** * Start Grpc Server */ + log.verbose('PuppetServer', 'start() gRPC server starting ...') const port = await util.promisify( this.grpcServer.bindAsync .bind(this.grpcServer), @@ -154,29 +170,40 @@ export class PuppetServer { } this.grpcServer.start() + log.verbose('PuppetServer', 'start() gRPC server starting ... done') } public async stop (): Promise { log.verbose('PuppetServer', 'stop()') - if (!this.grpcServer) { - throw new Error('no grpc server') - } - - await util.promisify( - this.grpcServer.tryShutdown - .bind(this.grpcServer), - )() - - const grpcServer = this.grpcServer - setImmediate(() => grpcServer.forceShutdown()) + if (this.grpcServer) { + const grpcServer = this.grpcServer + this.grpcServer = undefined + + log.verbose('PuppetServer', 'stop() shuting down gRPC server ...') + await util.promisify( + grpcServer.tryShutdown + .bind(grpcServer), + )() + log.verbose('PuppetServer', 'stop() shuting down gRPC server ... done') + + try { + grpcServer.forceShutdown() + } catch (e) { + log.warn('PuppetServer', 'stop() grpcServer.forceShutdown() rejection: %s', (e as Error).message) + } - this.grpcServer = undefined + } else { + log.warn('PuppetServer', 'stop() no grpcServer exist') + } if (this.urnRegistry) { + log.verbose('PuppetServer', 'stop() destory URN Registry ...') await this.urnRegistry.destroy() this.urnRegistry = undefined + log.verbose('PuppetServer', 'stop() destory URN Registry ... done') } + } } diff --git a/tests/grpc-stream.spec.ts b/tests/grpc-stream.spec.ts new file mode 100755 index 00000000..ddd6b07c --- /dev/null +++ b/tests/grpc-stream.spec.ts @@ -0,0 +1,79 @@ +#!/usr/bin/env -S node --no-warnings --loader ts-node/esm +/** + * @hcfw007, https://wechaty.js.org/contributors/wang-nan/ + * related issue: attempt to reconnect gRPC after disconnection + * Scenario: the watchdog tries to restart the service but failed due to the existence of eventstream + * Caused by the grpcClient set to undefined (still working on why this happens) while eventstream still working + * issue: #172, https://github.com/wechaty/puppet-service/issues/172 + * + * NodeJS: How Is Logging Enabled for the @grpc/grpc.js Package + * https://stackoverflow.com/a/60935367/1123955 + * GRPC_VERBOSITY=DEBUG GRPC_TRACE=all + */ + +import { + test, + sinon, +} from 'tstest' +import type { + PuppetOptions, +} from 'wechaty-puppet' +import { + PuppetMock, +} from 'wechaty-puppet-mock' +import getPort from 'get-port' + +import { + PuppetService, + PuppetServer, + PuppetServerOptions, +} from '../src/mod.js' + +test('Close eventStream when gRPC breaks', async t => { + /** + * Huan(202110): + * `insecure_` prefix is required for the TLS version of Puppet Service + * because the `insecure` will be the SNI name of the Puppet Service + * and it will be enforced for the security (required by TLS) + */ + const TOKEN = 'insecure_token' + const PORT = await getPort() + const ENDPOINT = '0.0.0.0:' + PORT + + const puppet = new PuppetMock() + const spyOnStart = sinon.spy(puppet, 'onStart') + /** + * Puppet Server + */ + const serverOptions = { + endpoint: ENDPOINT, + puppet: puppet, + token: TOKEN, + } as PuppetServerOptions + + const puppetServer = new PuppetServer(serverOptions) + await puppetServer.start() + + /** + * Puppet Service Client + */ + const puppetOptions = { + endpoint: ENDPOINT, + token: TOKEN, + } as PuppetOptions + + const puppetService = new PuppetService(puppetOptions) + await puppetService.start() + t.ok(spyOnStart.called, 'should called the puppet server onStart() function') + + puppetService.on('error', console.error) + + // mock grpcClient break + await puppetService.grpc.client.close() + await puppetService.stop() + + // get eventStream status + t.throws(() => puppetService.grpc, 'should clean grpc after stop()') + + await puppetServer.stop() +}) diff --git a/tests/performance.spec.ts b/tests/performance.spec.ts index d99f5a24..551cccb0 100755 --- a/tests/performance.spec.ts +++ b/tests/performance.spec.ts @@ -6,6 +6,10 @@ * * 负载测试,并发测试和压力测试,这三者之前的区别和联系? * https://www.zhihu.com/question/269215477/answer/350162604 + * + * NodeJS: How Is Logging Enabled for the @grpc/grpc.js Package + * https://stackoverflow.com/a/60935367/1123955 + * GRPC_VERBOSITY=DEBUG GRPC_TRACE=all */ import { @@ -122,8 +126,8 @@ test.skip('stress testing', async t => { const actualNameList = resultList.map(payload => payload.name) const EXPECTED_RESULT_LIST = concurrencyList.map(idToName) - t.equals(spy.callCount, CONCURRENCY, `should be called ${CONCURRENCY} times`) - t.deepEqual(actualNameList, EXPECTED_RESULT_LIST, `should get the right result with a huge concurrency ${CONCURRENCY}`) + t.equal(spy.callCount, CONCURRENCY, `should be called ${CONCURRENCY} times`) + t.same(actualNameList, EXPECTED_RESULT_LIST, `should get the right result with a huge concurrency ${CONCURRENCY}`) t.ok(dongList.length > 10, `dongList should receive many dong data (actual: ${dongList.length})`) t.equal(dongList[0], 'interval 0', 'dongList should get the first response from counter 0')