Skip to content

Commit

Permalink
Reset event stream unit test (#175)
Browse files Browse the repository at this point in the history
* change version to be 0.22

* 0.22.1

* 0.22.2

* adjust peer dependency wechaty-puppet version to >=0.40

* fix: 🐛 try to stop grpcStream evenif grpcClient does not exist

1. grpcStream method will not fail even if stream does not exist; 2.
grpcStream might exist even if grpcClient does not

✅ Closes: #172

* fix: 🐛 recreate gRPC stream when it already exists

  I think it's better to warn and recreate stream since the upper level
bot does not have direct access to these private method to handle it
properly.

✅ Closes: #172

* fix: 🐛 cancel() is needed when grpc connection breaks

* fix: 🐛 revert 9f0d201

as we close stream in the previos commit, start with event stream should
never happen, thus if somehow this happened, an exception shold be
thrown

* test: 💍 close event stream when grpc breaks

* style: 💄 lint issue

* docs: ✏️ supply required info

* typo

* clean

* add stream grpc unit test

* 1.0.6

Co-authored-by: windmemory <wind.memory.cn@gmail.com>
Co-authored-by: NickWang <brotherstyx@gmail.com>
  • Loading branch information
3 people authored Oct 31, 2021
1 parent ece2543 commit a764bd2
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 53 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "wechaty-puppet-service",
"version": "1.0.5",
"version": "1.0.6",
"description": "Puppet Service for Wechaty",
"type": "module",
"exports": {
Expand Down
51 changes: 31 additions & 20 deletions src/client/grpc-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@ WechatyResolver.setup()

class GrpcClient extends EventEmitter {

#client? : puppet.PuppetClient
get client () : puppet.PuppetClient { return this.#client! }
protected _client? : puppet.PuppetClient
get client () : puppet.PuppetClient { return this._client! }

eventStream? : grpc.ClientReadableStream<puppet.EventResponse>

/**
* gRPC settings
*/
endpoint : string
caCert : Buffer
disableTls : boolean
serverName : string
caCert : Buffer
token : WechatyToken
endpoint : string
serverName : string
token : WechatyToken

constructor (private options: PuppetServiceOptions) {
super()
Expand Down Expand Up @@ -142,19 +142,28 @@ class GrpcClient extends EventEmitter {
/**
* 1. Disconnect from stream
*/
await this.stopStream()
this.stopStream()

/**
* 2. Stop the puppet
*/
await util.promisify(
this.client.stop
.bind(this.client),
)(new puppet.StopRequest())
try {
await util.promisify(
this.client.stop
.bind(this.client),
)(new puppet.StopRequest())
} catch (e) {
this.emit('error', e)
}

/**
* 3. Destroy grpc client
*/
await this.destroyClient()
try {
this.destroyClient()
} catch (e) {
this.emit('error', e)
}
}

protected async initClient (): Promise<void> {
Expand Down Expand Up @@ -196,12 +205,12 @@ class GrpcClient extends EventEmitter {
clientOptions['grpc.default_authority'] = grpcDefaultAuthority
}

if (this.#client) {
if (this._client) {
log.warn('GrpcClient', 'initClient() this.#client exists? Old client has been dropped.')
this.#client = undefined
this._client = undefined
}

this.#client = new puppet.PuppetClient(
this._client = new puppet.PuppetClient(
this.endpoint,
credential,
clientOptions,
Expand All @@ -211,17 +220,17 @@ class GrpcClient extends EventEmitter {
protected destroyClient (): void {
log.verbose('GrpcClient', 'destroyClient()')

if (!this.#client) {
if (!this._client) {
log.warn('GrpcClient', 'destroyClient() this.#client not exist')
return
}

const client = this.#client
const client = this._client
/**
* Huan(202108): we should set `this.client` to `undefined` at the current event loop
* to prevent the future usage of the old client.
*/
this.#client = undefined
this._client = undefined

try {
client.close()
Expand Down Expand Up @@ -303,7 +312,7 @@ class GrpcClient extends EventEmitter {

/**
* Huan(202108): the `heartbeat` event is not guaranteed to be emitted
* if a puppet service provider is coming from the community, and it does not follow the protocol.
* if a puppet service provider is coming from the community, it might not follow the protocol specification.
* So we need a timeout for compatible with those providers
*/
const TIMEOUT = 5 * 1000 // 5 seconds
Expand Down Expand Up @@ -346,7 +355,7 @@ class GrpcClient extends EventEmitter {
log.verbose('GrpcClient', 'stopStream()')

if (!this.eventStream) {
log.verbose('GrpcClient', 'no eventStream when stop, skip destroy.')
log.verbose('GrpcClient', 'stopStream() no eventStream when stop, skip destroy.')
return
}
/**
Expand All @@ -363,7 +372,9 @@ class GrpcClient extends EventEmitter {
*/
// this.eventStream.cancel()

log.verbose('GrpcClient', 'stopStream() eventStream destroying ...')
eventStream.destroy()
log.verbose('GrpcClient', 'stopStream() eventStream destroyed')
}

}
Expand Down
29 changes: 13 additions & 16 deletions src/client/puppet-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,16 @@ export class PuppetService extends PUPPET.Puppet {
log.verbose('PuppetService', 'onStop()')

if (this.recoverSubscription) {
this.recoverSubscription.unsubscribe()
const recoverSubscription = this.recoverSubscription
this.recoverSubscription = undefined
recoverSubscription.unsubscribe()
}

await this._grpc?.stop()
this._grpc = undefined
if (this._grpc) {
const grpc = this._grpc
this._grpc = undefined
await grpc.stop()
}
}

protected hookPayloadStore (): void {
Expand Down Expand Up @@ -180,14 +184,14 @@ export class PuppetService extends PUPPET.Puppet {
log.verbose('PuppetService', 'bridgeGrpcEventStream() eventStream.on(end)')
})
.on('error', (e: unknown) => {
this.emit('error', e)
// https://github.com/wechaty/wechaty-puppet-service/issues/16
log.verbose('PuppetService', 'bridgeGrpcEventStream() eventStream.on(error) %s', e)
const reason = 'bridgeGrpcEventStream() eventStream.on(error) ' + e
// log.verbose('PuppetService', 'bridgeGrpcEventStream() eventStream.on(error) %s', e)
// const reason = 'bridgeGrpcEventStream() eventStream.on(error) ' + e
/**
* Huan(202110): simple reset puppet when grpc client has error? (or not?)
*/
this.emit('error', new Error(reason))
this.wrapAsync(this.reset())
// this.wrapAsync(this.reset())
// /**
// * The `Puppet` class have a throttleQueue for receiving the `reset` events
// * and it's the `Puppet` class's duty for call the `puppet.reset()` to reset the puppet.
Expand Down Expand Up @@ -303,13 +307,7 @@ export class PuppetService extends PUPPET.Puppet {
override async logout (reason = 'logout()'): Promise<void> {
log.verbose('PuppetService', 'logout("%s")', reason)

if (!this.logonoff()) {
log.verbose('PuppetService', 'logout("%s") puppet does not logged in, do nothing. %s',
reason,
new Error().stack,
)
return
}
super.logout(reason)

try {
await util.promisify(
Expand All @@ -318,8 +316,7 @@ export class PuppetService extends PUPPET.Puppet {
)(new grpcPuppet.LogoutRequest())

} catch (e) {
log.error('PuppetService', 'logout() rejection: %s', e && (e as Error).message)
throw e
this.emit('error', e)
}
}

Expand Down
55 changes: 41 additions & 14 deletions src/server/puppet-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,35 +76,47 @@ export class PuppetServer {
}

if (!this.urnRegistry) {
log.verbose('PuppetServer', 'start() initializing FileBox UUID URN Registry ...')
this.urnRegistry = new UniformResourceNameRegistry()
await this.urnRegistry.init()
log.verbose('PuppetServer', 'start() initializing FileBox UUID URN Registry ... done')
}

this.grpcServer = new grpc.Server(GRPC_OPTIONS)

/**
* Connect FileBox with UUID Manager
*/
const FileBoxUuid = uuidifyFileBoxLocal(this.urnRegistry)

log.verbose('PuppetServer', 'start() initializing gRPC Server with options "%s"', JSON.stringify(GRPC_OPTIONS))
this.grpcServer = new grpc.Server(GRPC_OPTIONS)
log.verbose('PuppetServer', 'start() initializing gRPC Server ... done', JSON.stringify(GRPC_OPTIONS))

log.verbose('PuppetServer', 'start() initializing puppet implementation with FileBoxUuid...')
const puppetImpl = puppetImplementation(
this.options.puppet,
FileBoxUuid,
)
log.verbose('PuppetServer', 'start() initializing puppet implementation with FileBoxUuid... done')

log.verbose('PuppetServer', 'start() initializing authorization with token ...')
const puppetImplAuth = authImplToken(this.options.token)(puppetImpl)
this.grpcServer.addService(
grpcPuppet.PuppetService,
puppetImplAuth,
)
log.verbose('PuppetServer', 'start() initializing authorization with token ... done')

log.verbose('PuppetServer', 'start() initializing gRPC health service ...')
const healthImpl = healthImplementation(
this.options.puppet,
)
this.grpcServer.addService(
grpcGoogle.HealthService,
healthImpl,
)
log.verbose('PuppetServer', 'start() initializing gRPC health service ... done')

log.verbose('PuppetServer', 'start() initializing TLS CA ...')
const caCerts = envVars.WECHATY_PUPPET_SERVICE_TLS_CA_CERT()
const caCertBuf = caCerts
? Buffer.from(caCerts)
Expand All @@ -118,6 +130,7 @@ export class PuppetServer {
envVars.WECHATY_PUPPET_SERVICE_TLS_SERVER_KEY(this.options.tls?.serverKey)
|| TLS_INSECURE_SERVER_KEY,
)
log.verbose('PuppetServer', 'start() initializing TLS CA ... done')

const keyCertPairs: grpc.KeyCertPair[] = [{
cert_chain : certChain,
Expand All @@ -129,6 +142,7 @@ export class PuppetServer {
* we introduced the WECHATY_PUPPET_SERVICE_NO_TLS_INSECURE_{SERVER,CLIENT} environment variables.
* if it has been set, then we will run under HTTP instead of HTTPS
*/
log.verbose('PuppetServer', 'start() initializing gRPC server credentials ...')
let credential
if (envVars.WECHATY_PUPPET_SERVICE_NO_TLS_INSECURE_SERVER(this.options.tls?.disable)) {
log.warn('PuppetServer', 'start() TLS disabled: INSECURE!')
Expand All @@ -137,10 +151,12 @@ export class PuppetServer {
log.verbose('PuppetServer', 'start() TLS enabled.')
credential = grpc.ServerCredentials.createSsl(caCertBuf, keyCertPairs)
}
log.verbose('PuppetServer', 'start() initializing gRPC server credentials ... done')

/***
* Start Grpc Server
*/
log.verbose('PuppetServer', 'start() gRPC server starting ...')
const port = await util.promisify(
this.grpcServer.bindAsync
.bind(this.grpcServer),
Expand All @@ -154,29 +170,40 @@ export class PuppetServer {
}

this.grpcServer.start()
log.verbose('PuppetServer', 'start() gRPC server starting ... done')
}

public async stop (): Promise<void> {
log.verbose('PuppetServer', 'stop()')

if (!this.grpcServer) {
throw new Error('no grpc server')
}

await util.promisify(
this.grpcServer.tryShutdown
.bind(this.grpcServer),
)()

const grpcServer = this.grpcServer
setImmediate(() => grpcServer.forceShutdown())
if (this.grpcServer) {
const grpcServer = this.grpcServer
this.grpcServer = undefined

log.verbose('PuppetServer', 'stop() shuting down gRPC server ...')
await util.promisify(
grpcServer.tryShutdown
.bind(grpcServer),
)()
log.verbose('PuppetServer', 'stop() shuting down gRPC server ... done')

try {
grpcServer.forceShutdown()
} catch (e) {
log.warn('PuppetServer', 'stop() grpcServer.forceShutdown() rejection: %s', (e as Error).message)
}

this.grpcServer = undefined
} else {
log.warn('PuppetServer', 'stop() no grpcServer exist')
}

if (this.urnRegistry) {
log.verbose('PuppetServer', 'stop() destory URN Registry ...')
await this.urnRegistry.destroy()
this.urnRegistry = undefined
log.verbose('PuppetServer', 'stop() destory URN Registry ... done')
}

}

}
Loading

0 comments on commit a764bd2

Please sign in to comment.